国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

渣渣的 ElasticSearch 源碼解析 —— 啟動流程(下)

ztyzz / 1848人閱讀

摘要:關(guān)注我轉(zhuǎn)載請務(wù)必注明原創(chuàng)地址為前提上篇文章寫完了流程啟動的一部分,方法都入口,以及創(chuàng)建運(yùn)行的必須環(huán)境以及相關(guān)配置,接著就是創(chuàng)建該環(huán)境的節(jié)點(diǎn)了。的創(chuàng)建看下新建節(jié)點(diǎn)的代碼代碼比較多,這里是比較關(guān)鍵的地方,我就把注釋直接寫在代碼上面了,實(shí)在不好

關(guān)注我

轉(zhuǎn)載請務(wù)必注明原創(chuàng)地址為:http://www.54tianzhisheng.cn/2018/08/12/es-code03/

前提

上篇文章寫完了 ES 流程啟動的一部分,main 方法都入口,以及創(chuàng)建 Elasticsearch 運(yùn)行的必須環(huán)境以及相關(guān)配置,接著就是創(chuàng)建該環(huán)境的節(jié)點(diǎn)了。

Node 的創(chuàng)建

看下新建節(jié)點(diǎn)的代碼:(代碼比較多,這里是比較關(guān)鍵的地方,我就把注釋直接寫在代碼上面了,實(shí)在不好拆開這段代碼,300 多行代碼)

public Node(Environment environment) {
        this(environment, Collections.emptyList()); //執(zhí)行下面的代碼
    }

protected Node(final Environment environment, Collection> classpathPlugins) {
    final List resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an error
    boolean success = false;
    {
// use temp logger just to say we are starting. we can"t use it later on because the node name might not be set
        Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(environment.settings()));
        logger.info("initializing ...");
    }
    try {
        originalSettings = environment.settings();
        Settings tmpSettings = Settings.builder().put(environment.settings())
            .put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();

// create the node environment as soon as possible, to recover the node id and enable logging
        try {
            nodeEnvironment = new NodeEnvironment(tmpSettings, environment); //1、創(chuàng)建節(jié)點(diǎn)環(huán)境,比如節(jié)點(diǎn)名稱,節(jié)點(diǎn)ID,分片信息,存儲元,以及分配內(nèi)存準(zhǔn)備給節(jié)點(diǎn)使用
            resourcesToClose.add(nodeEnvironment);
        } catch (IOException ex) {
        throw new IllegalStateException("Failed to create node environment", ex);
        }
        final boolean hadPredefinedNodeName = NODE_NAME_SETTING.exists(tmpSettings);
        final String nodeId = nodeEnvironment.nodeId();
        tmpSettings = addNodeNameIfNeeded(tmpSettings, nodeId);
        final Logger logger = Loggers.getLogger(Node.class, tmpSettings);
// this must be captured after the node name is possibly added to the settings
        final String nodeName = NODE_NAME_SETTING.get(tmpSettings);
        if (hadPredefinedNodeName == false) {
            logger.info("node name derived from node ID [{}]; set [{}] to override", nodeId, NODE_NAME_SETTING.getKey());
        } else {
            logger.info("node name [{}], node ID [{}]", nodeName, nodeId);
        }

        //2、打印出JVM相關(guān)信息
        final JvmInfo jvmInfo = JvmInfo.jvmInfo();
        logger.info(
"version[{}], pid[{}], build[{}/{}/{}/{}], OS[{}/{}/{}], JVM[{}/{}/{}/{}]",
            Version.displayVersion(Version.CURRENT, Build.CURRENT.isSnapshot()),
            jvmInfo.pid(), Build.CURRENT.flavor().displayName(),
            Build.CURRENT.type().displayName(), Build.CURRENT.shortHash(),
            Build.CURRENT.date(), Constants.OS_NAME, Constants.OS_VERSION,
            Constants.OS_ARCH,Constants.JVM_VENDOR,Constants.JVM_NAME,
            Constants.JAVA_VERSION,Constants.JVM_VERSION);
        logger.info("JVM arguments {}", Arrays.toString(jvmInfo.getInputArguments()));
        //檢查當(dāng)前版本是不是 pre-release 版本(Snapshot),
        warnIfPreRelease(Version.CURRENT, Build.CURRENT.isSnapshot(), logger);
        。。。
        this.pluginsService = new PluginsService(tmpSettings, environment.configFile(), environment.modulesFile(), environment.pluginsFile(), classpathPlugins);   //3、利用PluginsService加載相應(yīng)的模塊和插件
        this.settings = pluginsService.updatedSettings();
        localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());

// create the environment based on the finalized (processed) view of the settings
// this is just to makes sure that people get the same settings, no matter where they ask them from
        this.environment = new Environment(this.settings, environment.configFile());
        Environment.assertEquivalent(environment, this.environment);

        final List> executorBuilders = pluginsService.getExecutorBuilders(settings);        //線程池

        final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
        resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
        // adds the context to the DeprecationLogger so that it does not need to be injected everywhere
        DeprecationLogger.setThreadContext(threadPool.getThreadContext());
        resourcesToClose.add(() -> DeprecationLogger.removeThreadContext(threadPool.getThreadContext()));

        final List> additionalSettings = new ArrayList<>(pluginsService.getPluginSettings());       //額外配置
        final List additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter());
        for (final ExecutorBuilder builder : threadPool.builders()) {
            //4、加載一些額外配置
            additionalSettings.addAll(builder.getRegisteredSettings());
        }
        client = new NodeClient(settings, threadPool);//5、創(chuàng)建一個(gè)節(jié)點(diǎn)客戶端                                                                                  

        //6、緩存一系列模塊,如NodeModule,ClusterModule,IndicesModule,ActionModule,GatewayModule,SettingsModule,RepositioriesModule,scriptModule,analysisModule
        final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
        final ScriptModule scriptModule = new ScriptModule(settings, pluginsService.filterPlugins(ScriptPlugin.class));
        AnalysisModule analysisModule = new AnalysisModule(this.environment, pluginsService.filterPlugins(AnalysisPlugin.class));
        // this is as early as we can validate settings at this point. we already pass them to ScriptModule as well as ThreadPool so we might be late here already
        final SettingsModule settingsModule = new SettingsModule(this.settings, additionalSettings, additionalSettingsFilter);
scriptModule.registerClusterSettingsListeners(settingsModule.getClusterSettings());
        resourcesToClose.add(resourceWatcherService);
        final NetworkService networkService = new NetworkService(
  getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class)));
        List clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class);
        final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool,                                                      ClusterModule.getClusterStateCustomSuppliers(clusterPlugins));
        clusterService.addStateApplier(scriptModule.getScriptService());
        resourcesToClose.add(clusterService);
        final IngestService ingestService = new IngestService(settings, threadPool, this.environment,                                                  scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
        final DiskThresholdMonitor listener = new DiskThresholdMonitor(settings, clusterService::state, clusterService.getClusterSettings(), client);
        final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client,
listener::onNewInfo);
        final UsageService usageService = new UsageService(settings);

        ModulesBuilder modules = new ModulesBuilder();
// plugin modules must be added here, before others or we can get crazy injection errors...
        for (Module pluginModule : pluginsService.createGuiceModules()) {
            modules.add(pluginModule);
        }
        final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, clusterInfoService);
        ClusterModule clusterModule = new ClusterModule(settings, clusterService, clusterPlugins, clusterInfoService);
        modules.add(clusterModule);
        IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));
        modules.add(indicesModule);

        SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class));
        CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(),
                                                                                  settingsModule.getClusterSettings());
        resourcesToClose.add(circuitBreakerService);
        modules.add(new GatewayModule());

        PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings);
        BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService);
        resourcesToClose.add(bigArrays);
        modules.add(settingsModule);
        List namedWriteables = Stream.of(
            NetworkModule.getNamedWriteables().stream(),
            indicesModule.getNamedWriteables().stream(),
            searchModule.getNamedWriteables().stream(),
            pluginsService.filterPlugins(Plugin.class).stream()
            .flatMap(p -> p.getNamedWriteables().stream()),
            ClusterModule.getNamedWriteables().stream())
            .flatMap(Function.identity()).collect(Collectors.toList());
        final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables);
        NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(Stream.of(
            NetworkModule.getNamedXContents().stream(),
            searchModule.getNamedXContents().stream(),
            pluginsService.filterPlugins(Plugin.class).stream()
            .flatMap(p -> p.getNamedXContent().stream()),
            ClusterModule.getNamedXWriteables().stream())
.flatMap(Function.identity()).collect(toList()));
        modules.add(new RepositoriesModule(this.environment, pluginsService.filterPlugins(RepositoryPlugin.class), xContentRegistry));
        final MetaStateService metaStateService = new MetaStateService(settings, nodeEnvironment, xContentRegistry);
        final IndicesService indicesService = new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry,
analysisModule.getAnalysisRegistry(),                                                                clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays, scriptModule.getScriptService(),client, metaStateService);

        Collection pluginComponents = pluginsService.filterPlugins(Plugin.class).stream()
            .flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService,scriptModule.getScriptService(), xContentRegistry, environment, nodeEnvironment,namedWriteableRegistry).stream())
.collect(Collectors.toList());

        ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(),
                                                     settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(),threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService);
        modules.add(actionModule);

        //7、獲取RestController,用于處理各種Elasticsearch的rest命令,如_cat,_all,_cat/health,_clusters等rest命令(Elasticsearch稱之為action)
        final RestController restController = actionModule.getRestController();
        final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class),threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry,networkService, restController);
        Collection>> customMetaDataUpgraders =
            pluginsService.filterPlugins(Plugin.class).stream()
            .map(Plugin::getCustomMetaDataUpgrader)
            .collect(Collectors.toList());
        Collection>> indexTemplateMetaDataUpgraders =
            pluginsService.filterPlugins(Plugin.class).stream()
            .map(Plugin::getIndexTemplateMetaDataUpgrader)
            .collect(Collectors.toList());
        Collection> indexMetaDataUpgraders = pluginsService.filterPlugins(Plugin.class).stream()
            .map(Plugin::getIndexMetaDataUpgrader).collect(Collectors.toList());
        final MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(customMetaDataUpgraders, indexTemplateMetaDataUpgraders);
        final MetaDataIndexUpgradeService metaDataIndexUpgradeService = new MetaDataIndexUpgradeService(settings, xContentRegistry,                                                                                            indicesModule.getMapperRegistry(), settingsModule.getIndexScopedSettings(), indexMetaDataUpgraders);
        final GatewayMetaState gatewayMetaState = new GatewayMetaState(settings, nodeEnvironment, metaStateService,                                                      metaDataIndexUpgradeService, metaDataUpgrader);
        new TemplateUpgradeService(settings, client, clusterService, threadPool, indexTemplateMetaDataUpgraders);
        final Transport transport = networkModule.getTransportSupplier().get();
        Set taskHeaders = Stream.concat(
            pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),
            Stream.of("X-Opaque-Id")
        ).collect(Collectors.toSet());
        final TransportService transportService = newTransportService(settings, transport, threadPool,
                                                                      networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);
        final ResponseCollectorService responseCollectorService = new ResponseCollectorService(this.settings, clusterService);
        final SearchTransportService searchTransportService =  new SearchTransportService(settings, transportService,
                                                                                          SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
        final Consumer httpBind;
        final HttpServerTransport httpServerTransport;
        if (networkModule.isHttpEnabled()) {
            httpServerTransport = networkModule.getHttpServerTransportSupplier().get();
            httpBind = b -> {
b.bind(HttpServerTransport.class).toInstance(httpServerTransport);
            };
        } else {
            httpBind = b -> {
                b.bind(HttpServerTransport.class).toProvider(Providers.of(null));
            };
            httpServerTransport = null;
        }

        final DiscoveryModule discoveryModule = new DiscoveryModule(this.settings, threadPool, transportService, namedWriteableRegistry,networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(),clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class),clusterModule.getAllocationService());
        
        this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(),httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,searchTransportService);

        final SearchService searchService = newSearchService(clusterService, indicesService, threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(),responseCollectorService);

        final List> tasksExecutors = pluginsService
            .filterPlugins(PersistentTaskPlugin.class).stream()
     .map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client))
            .flatMap(List::stream)
            .collect(toList());

        final PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(settings, tasksExecutors);
        final PersistentTasksClusterService persistentTasksClusterService =
            new PersistentTasksClusterService(settings, registry, clusterService);
        final PersistentTasksService persistentTasksService = new PersistentTasksService(settings, clusterService, threadPool, client);

//8、綁定處理各種服務(wù)的實(shí)例,這里是最核心的地方,也是Elasticsearch能處理各種服務(wù)的核心.
        modules.add(b -> {
            b.bind(Node.class).toInstance(this);
            b.bind(NodeService.class).toInstance(nodeService);
            b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);
            b.bind(PluginsService.class).toInstance(pluginsService);
            b.bind(Client.class).toInstance(client);
            b.bind(NodeClient.class).toInstance(client);
            b.bind(Environment.class).toInstance(this.environment);
            b.bind(ThreadPool.class).toInstance(threadPool);
            b.bind(NodeEnvironment.class).toInstance(nodeEnvironment);
 b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService);
b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
            b.bind(BigArrays.class).toInstance(bigArrays);
      b.bind(ScriptService.class).toInstance(scriptModule.getScriptService());
 b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());
            b.bind(IngestService.class).toInstance(ingestService);
            b.bind(UsageService.class).toInstance(usageService);
 b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
            b.bind(MetaDataUpgrader.class).toInstance(metaDataUpgrader);
            b.bind(MetaStateService.class).toInstance(metaStateService);
            b.bind(IndicesService.class).toInstance(indicesService);
            b.bind(SearchService.class).toInstance(searchService);            b.bind(SearchTransportService.class).toInstance(searchTransportService);
b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(settings, searchService::createReduceContext));
            b.bind(Transport.class).toInstance(transport);
            b.bind(TransportService.class).toInstance(transportService);
            b.bind(NetworkService.class).toInstance(networkService);
            b.bind(UpdateHelper.class).toInstance(new UpdateHelper(settings, scriptModule.getScriptService()));
b.bind(MetaDataIndexUpgradeService.class).toInstance(metaDataIndexUpgradeService);
            b.bind(ClusterInfoService.class).toInstance(clusterInfoService);
            b.bind(GatewayMetaState.class).toInstance(gatewayMetaState);
            b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery());
            {
                RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
                processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);
                b.bind(PeerRecoverySourceService.class).toInstance(new PeerRecoverySourceService(settings, transportService,
indicesService, recoverySettings));
                b.bind(PeerRecoveryTargetService.class).toInstance(new PeerRecoveryTargetService(settings, threadPool,
transportService, recoverySettings, clusterService));
            }
            httpBind.accept(b);
            pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p));
b.bind(PersistentTasksService.class).toInstance(persistentTasksService);       b.bind(PersistentTasksClusterService.class).toInstance(persistentTasksClusterService);
b.bind(PersistentTasksExecutorRegistry.class).toInstance(registry); });
        injector = modules.createInjector();

        // TODO hack around circular dependencies problems in AllocationService
clusterModule.getAllocationService().setGatewayAllocator(injector.getInstance(GatewayAllocator.class));

        List pluginLifecycleComponents = pluginComponents.stream()
            .filter(p -> p instanceof LifecycleComponent)
            .map(p -> (LifecycleComponent) p).collect(Collectors.toList());

        //9、利用Guice將各種模塊以及服務(wù)(xxxService)注入到Elasticsearch環(huán)境中
pluginLifecycleComponents.addAll(pluginsService.getGuiceServiceClasses().stream()                                     .map(injector::getInstance).collect(Collectors.toList()));
        resourcesToClose.addAll(pluginLifecycleComponents);
        this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);
        client.initialize(injector.getInstance(new Key>() {}), () -> clusterService.localNode().getId(), transportService.getRemoteClusterService());

        if (NetworkModule.HTTP_ENABLED.get(settings)) { //如果elasticsearch.yml文件中配置了http.enabled參數(shù)(默認(rèn)為true),則會初始化RestHandlers
            logger.debug("initializing HTTP handlers ...");
            actionModule.initRestHandlers(() -> clusterService.state().nodes()); //初始化RestHandlers, 解析集群命令,如_cat/,_cat/health
        }
        //10、初始化工作完成
        logger.info("initialized");

        success = true;
    } catch (IOException ex) {
        throw new ElasticsearchException("failed to bind service", ex);
    } finally {
        if (!success) {
            IOUtils.closeWhileHandlingException(resourcesToClose);
        }
    }
}

上面代碼真的很多,這里再說下上面這么多代碼主要干了什么吧:(具體是哪行代碼執(zhí)行的如下流程,上面代碼中也標(biāo)記了)

1、創(chuàng)建節(jié)點(diǎn)環(huán)境,比如節(jié)點(diǎn)名稱,節(jié)點(diǎn) ID,分片信息,存儲元,以及分配內(nèi)存準(zhǔn)備給節(jié)點(diǎn)使用

2、打印出 JVM 相關(guān)信息

3、利用 PluginsService 加載相應(yīng)的模塊和插件,具體哪些模塊可以去 modules 目錄下查看

4、加載一些額外的配置參數(shù)

5、創(chuàng)建一個(gè)節(jié)點(diǎn)客戶端

6、緩存一系列模塊,如NodeModule,ClusterModule,IndicesModule,ActionModule,GatewayModule,SettingsModule,RepositioriesModule,scriptModule,analysisModule

7、獲取 RestController,用于處理各種 Elasticsearch 的 rest 命令,如 _cat, _all, _cat/health, _clusters 等 rest命令

8、綁定處理各種服務(wù)的實(shí)例

9、利用 Guice 將各種模塊以及服務(wù)(xxxService)注入到 Elasticsearch 環(huán)境中

10、初始化工作完成(打印日志)

JarHell 報(bào)錯(cuò)解釋

前一篇閱讀源碼環(huán)境搭建的文章寫過用 JDK 1.8 編譯 ES 源碼是會遇到如下異常:

org.elasticsearch.bootstrap.StartupException: java.lang.IllegalStateException: jar hell!

這里說下就是 setup 方法中的如下代碼導(dǎo)致的

try {
    // look for jar hell
    final Logger logger = ESLoggerFactory.getLogger(JarHell.class);
    JarHell.checkJarHell(logger::debug);
} catch (IOException | URISyntaxException e) {
    throw new BootstrapException(e);
}

所以你如果是用 JDK 1.8 編譯的,那么就需要把所有的有這塊的代碼給注釋掉就可以編譯成功的。

我自己試過用 JDK 10 編譯是沒有出現(xiàn)這里報(bào)錯(cuò)的。

正式啟動 ES 節(jié)點(diǎn)

回到上面 Bootstrap 中的靜態(tài) init 方法中,接下來就是正式啟動 elasticsearch 節(jié)點(diǎn)了:

INSTANCE.start();  //調(diào)用下面的 start 方法

private void start() throws NodeValidationException {
    node.start();                                       //正式啟動 Elasticsearch 節(jié)點(diǎn)
    keepAliveThread.start();
}

接下來看看這個(gè) start 方法里面的 node.start() 方法源碼:

public Node start() throws NodeValidationException {
    if (!lifecycle.moveToStarted()) {
        return this;
    }

    Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings));
    logger.info("starting ...");
    pluginLifecycleComponents.forEach(LifecycleComponent::start); 
    
    //1、利用Guice獲取上述注冊的各種模塊以及服務(wù)
    //Node 的啟動其實(shí)就是 node 里每個(gè)組件的啟動,同樣的,分別調(diào)用不同的實(shí)例的 start 方法來啟動這個(gè)組件, 如下:
    injector.getInstance(MappingUpdatedAction.class).setClient(client);
    injector.getInstance(IndicesService.class).start();
    injector.getInstance(IndicesClusterStateService.class).start();
    injector.getInstance(SnapshotsService.class).start();
    injector.getInstance(SnapshotShardsService.class).start();
    injector.getInstance(RoutingService.class).start();
    injector.getInstance(SearchService.class).start();
    nodeService.getMonitorService().start();

    final ClusterService clusterService = injector.getInstance(ClusterService.class);

    final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);
    nodeConnectionsService.start();
    clusterService.setNodeConnectionsService(nodeConnectionsService);

    injector.getInstance(ResourceWatcherService.class).start();
    injector.getInstance(GatewayService.class).start();
    Discovery discovery = injector.getInstance(Discovery.class);
    clusterService.getMasterService().setClusterStatePublisher(discovery::publish);

    // Start the transport service now so the publish address will be added to the local disco node in ClusterService
    TransportService transportService = injector.getInstance(TransportService.class);
    transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
    transportService.start();
    assert localNodeFactory.getNode() != null;
    assert transportService.getLocalNode().equals(localNodeFactory.getNode())
        : "transportService has a different local node than the factory provided";
    final MetaData onDiskMetadata;
    try {
        // we load the global state here (the persistent part of the cluster state stored on disk) to
        // pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.
        if (DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings)) {//根據(jù)配置文件看當(dāng)前節(jié)點(diǎn)是master還是data節(jié)點(diǎn)
            onDiskMetadata = injector.getInstance(GatewayMetaState.class).loadMetaState();
        } else {
            onDiskMetadata = MetaData.EMPTY_META_DATA;
        }
        assert onDiskMetadata != null : "metadata is null but shouldn"t"; // this is never null
    } catch (IOException e) {
        throw new UncheckedIOException(e);
    }
    validateNodeBeforeAcceptingRequests(new BootstrapContext(settings, onDiskMetadata), transportService.boundAddress(), pluginsService
        .filterPlugins(Plugin
        .class)
        .stream()
        .flatMap(p -> p.getBootstrapChecks().stream()).collect(Collectors.toList()));

    //2、將當(dāng)前節(jié)點(diǎn)加入到一個(gè)集群簇中去,并啟動當(dāng)前節(jié)點(diǎn)
    clusterService.addStateApplier(transportService.getTaskManager());
    // start after transport service so the local disco is known
    discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService
    clusterService.start();
    assert clusterService.localNode().equals(localNodeFactory.getNode())
        : "clusterService has a different local node than the factory provided";
    transportService.acceptIncomingRequests();
    discovery.startInitialJoin();
    // tribe nodes don"t have a master so we shouldn"t register an observer         s
    final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings);
    if (initialStateTimeout.millis() > 0) {
        final ThreadPool thread = injector.getInstance(ThreadPool.class);
        ClusterState clusterState = clusterService.state();
        ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, null, logger, thread.getThreadContext());
        if (clusterState.nodes().getMasterNodeId() == null) {
            logger.debug("waiting to join the cluster. timeout [{}]", initialStateTimeout);
            final CountDownLatch latch = new CountDownLatch(1);
            observer.waitForNextChange(new ClusterStateObserver.Listener() {
                @Override
                public void onNewClusterState(ClusterState state) { latch.countDown(); }

                @Override
                public void onClusterServiceClose() {
                    latch.countDown();
                }

                @Override
                public void onTimeout(TimeValue timeout) {
                    logger.warn("timed out while waiting for initial discovery state - timeout: {}",
                        initialStateTimeout);
                    latch.countDown();
                }
            }, state -> state.nodes().getMasterNodeId() != null, initialStateTimeout);

            try {
                latch.await();
            } catch (InterruptedException e) {
                throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state");
            }
        }
    }


    if (NetworkModule.HTTP_ENABLED.get(settings)) {
        injector.getInstance(HttpServerTransport.class).start();
    }

    if (WRITE_PORTS_FILE_SETTING.get(settings)) {
        if (NetworkModule.HTTP_ENABLED.get(settings)) {
            HttpServerTransport http = injector.getInstance(HttpServerTransport.class);
            writePortsFile("http", http.boundAddress());
        }
        TransportService transport = injector.getInstance(TransportService.class);
        writePortsFile("transport", transport.boundAddress());
    }

    logger.info("started");

    pluginsService.filterPlugins(ClusterPlugin.class).forEach(ClusterPlugin::onNodeStarted);

    return this;
}

上面代碼主要是:

1、利用 Guice 獲取上述注冊的各種模塊以及服務(wù),然后啟動 node 里每個(gè)組件(分別調(diào)用不同的實(shí)例的 start 方法來啟動)

2、打印日志(啟動節(jié)點(diǎn)完成)

總結(jié)

這篇文章主要把大概啟動流程串通了,講了下 node 節(jié)點(diǎn)的創(chuàng)建和正式啟動 ES 節(jié)點(diǎn)了。因?yàn)槠^多所以拆開成兩篇,先不扣細(xì)節(jié)了,后面流程啟動文章寫完后我們再單一的扣細(xì)節(jié)。

相關(guān)文章

1、渣渣菜雞為什么要看 ElasticSearch 源碼?

2、渣渣菜雞的 ElasticSearch 源碼解析 —— 環(huán)境搭建

3、渣渣菜雞的 ElasticSearch 源碼解析 —— 啟動流程(上)

4、渣渣菜雞的 ElasticSearch 源碼解析 —— 啟動流程(下)

5、Elasticsearch 系列文章(一):Elasticsearch 默認(rèn)分詞器和中分分詞器之間的比較及使用方法

6、Elasticsearch 系列文章(二):全文搜索引擎 Elasticsearch 集群搭建入門教程

7、Elasticsearch 系列文章(三):ElasticSearch 集群監(jiān)控

8、Elasticsearch 系列文章(四):ElasticSearch 單個(gè)節(jié)點(diǎn)監(jiān)控

9、Elasticsearch 系列文章(五):ELK 實(shí)時(shí)日志分析平臺環(huán)境搭建

10、教你如何在 IDEA 遠(yuǎn)程 Debug ElasticSearch

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/76904.html

相關(guān)文章

  • 渣的 ElasticSearch 源碼解析 —— 啟動流程(上)

    摘要:總結(jié)這篇文章主要先把大概啟動流程串通,因?yàn)槠^多所以拆開成兩篇,先不扣細(xì)節(jié)了,后面流程啟動文章寫完后我們再單一的扣細(xì)節(jié)。 關(guān)注我 showImg(https://segmentfault.com/img/remote/1460000012730965?w=258&h=258); 轉(zhuǎn)載請務(wù)必注明原創(chuàng)地址為:http://www.54tianzhisheng.cn/2018/08/11/...

    AZmake 評論0 收藏0
  • 渣的 ElasticSearch 源碼解析 —— 環(huán)境搭建

    摘要:注意這個(gè)版本需要和下面的源碼版本一致下載源碼從上下載相應(yīng)版本的源代碼,這里建議用,這樣的話后面你可以隨意切換到的其他版本去。我們看下有哪些版本的找到了目前源碼版本最新的版本的穩(wěn)定版為切換到該版本于是就可以切換到該穩(wěn)定版本了。 關(guān)注我 showImg(https://segmentfault.com/img/remote/1460000012730965?w=258&h=258); 轉(zhuǎn)載...

    wudengzan 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<