Flink 源码分析(二):Standalone 集群启动流程

更新至 Flink 1.9.0 版本

启动脚本

JobManager 启动脚本

执行 bin/jobmanager.sh start 启动 JobManager 流程:

  1. 执行 bin/config.sh 脚本,准备启动 JobManager 所必须的路径、配置和环境变量;
  2. 执行 bin/daemon.sh start standalonesession <参数> 后台启动 JobManager。

在 daemon.sh 脚本中,standalonesession 入口类为 org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint

TaskManager 启动脚本

执行 bin/taskmanager.sh start 启动 TaskManager 流程:

  1. 执行 bin/config.sh 脚本,准备启动 TaskManager 所必须的路径、配置和环境变量;
  2. 执行 bin/daemon.sh start taskexecutor <参数> 后台启动 TaskManager。

在 daemon.sh 脚本中,taskexecutor 入口类为 org.apache.flink.runtime.entrypoint.org.apache.flink.runtime.taskexecutor.TaskManagerRunner

JobManager 启动

在 StandaloneSessionClusterEntrypoint 的 main 方法里,调用 ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) 静态方法,启动 JobManager 代码:

StandaloneSessionClusterEntrypoint entrypoint = new StandaloneSessionClusterEntrypoint(configuration);

ClusterEntrypoint.runClusterEntrypoint(entrypoint);  

类 StandaloneSessionClusterEntrypoint 的继承体系:

StandaloneSessionClusterEntrypoint

调用 ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) 静态方法,调用 ClusterEntrypoint 的 void runCluster(Configuration configuration) 实例方法启动 JobManager。流程如下:

  1. 初始化服务;
  2. 创建集群组件。

初始化服务

调用 ClusterEntrypoint 的void initializeServices(Configuration configuration) 实例方法初始化服务。

服务包括:

  • RPC 服务 org.apache.flink.runtime.rpc.RpcService
  • I/O 线程池;
  • HA 服务 org.apache.flink.runtime.highavailability.HighAvailabilityServices
  • Blob 服务 org.apache.flink.runtime.blob.BlobServer
  • 心跳服务 org.apache.flink.runtime.heartbeat.HeartbeatServices
  • MetricRegistry org.apache.flink.runtime.metrics.MetricRegistry
  • ArchivedExecutionGraphStore org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore

创建集群组件

抽象工厂设计模式创建集群组件:

final DispatcherResourceManagerComponentFactory<?> dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);

clusterComponent = dispatcherResourceManagerComponentFactory.create(  
    configuration,
    commonRpcService,
    haServices,
    blobServer,
    heartbeatServices,
    metricRegistry,
    archivedExecutionGraphStore,
    new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
    this);

类 SessionDispatcherResourceManagerComponentFactory 继承体系:

SessionDispatcherResourceManagerComponentFactory

服务组件包括:

  • DispatcherLeaderRetriever;
  • ResourceManagerLeaderRetriever;
  • 资源管理器 org.apache.flink.runtime.resourcemanager.ResourceManager
  • WebMonitorEndpoint org.apache.flink.runtime.webmonitor.WebMonitorEndpoint
  • JobManagerMetricGroup org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
  • 调度器 org.apache.flink.runtime.dispatcher.Dispatcher

TaskManager 启动

在 TaskManagerRunner 的 main 方法中,调用 TaskManagerRunner.runTaskManager() 静态方法:

final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, resourceId);

taskManagerRunner.start();  

在 TaskManagerRunner 构造方法内,初始时化了以下服务:

  • HA 服务 org.apache.flink.runtime.highavailability.HighAvailabilityServices
  • RPC 服务 org.apache.flink.runtime.rpc.RpcService
  • 心跳服务 org.apache.flink.runtime.heartbeat.HeartbeatServices
  • MetricRegistry org.apache.flink.runtime.metrics.MetricRegistry
  • 支持缓存的 Blob 服务 org.apache.flink.runtime.blob.BlobCacheService

并创建了 TaskManager org.apache.flink.runtime.taskexecutor.TaskExecutor 实例,调用了 start 实例方法。

类 TaskExecutor 继承体系:

TaskExecutor

在 TaskExecutor 回调方法 onStart 内,启动了以下服务:

  • ResourceManagerLeaderRetriever;
  • TaskSlot 表 org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable
  • JobLeaderService org.apache.flink.runtime.taskexecutor.JobLeaderService
  • 文件缓存 org.apache.flink.runtime.filecache.FileCache