Worker 启动流程源码走查
Worker 的启动入口位于 org.apache.kafka.connect.cli.ConnectStandalone
类的 main
方法中。让我们从这里开始追踪 Worker 的启动流程:
1 2 3 4 5 6 7 8 9 10 11
| public static void main(String[] args) { ConnectStandalone ConnectStandalone = new ConnectStandalone(props); try { ConnectStandalone.startConnector(); } finally { } }
|
在 ConnectStandalone
构造函数中,会初始化一些辅助组件,如 Kafka AdminClient、MetricsReporter 等:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public ConnectStandalone(Map<String, String> props) { String remoteHost = ConfigUtils.getRemoteHost(config); replicaListener = new Listener(config); adminClient = AdminClient.create(adminClientConfigOverrides(config)); this.metrics = ConfigUtils.getMetrics("Connect"); this in.forEachRemaining(m -> addMetric(m, overallMetrics)); String metricReporters = ConfigUtils.getMetricReporters(config); reporters = ConfigUtils.initializeMetricReporters(metricReporters); this.config = config;
plugins = ConfigUtils.plugins(config.getClass().getClassLoader()); }
|
接着在 startConnector()
方法中,会进一步初始化 Herder 并启动它:
1 2 3 4 5 6 7 8
| void startConnector() { String workerId = ConfigUtils.getWorkerId(config); herder = new Herder(config, workerId, replicaListener, metrics, plugins); herder.start(); }
|
总的来说,Worker 启动的核心步骤包括:
- 解析配置参数
- 创建 Herder 及其他辅助组件
- 加载 Connector 插件
- 启动 Herder 加入集群
接下来,我们将重点分析 Herder 这个 Worker 的大脑组件。
Herder 核心类源码分析
Herder 作为 Worker 的核心管理组件,负责管理和协调 Connectors 和 Tasks 的整个生命周期。我们从以下几个方面来剖析 Herder 的实现:
1. Herder 的重要字段
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public Herder(ConfigDef configDef, String workerId, Listener listener, MetricsReporter metrics, PluginClassLoader pluginLoader) { this.time = new WorkerTime(); this.configTransformer = new WorkerConfigTransformer(configDef); this.workerId = workerId; this.federatedConfig = FederatedConfig.create(configDef); this.workerGroup = federatedConfig.getWorkerId(); this.herderProvider = new DistributedHerderProvider(federatedConfig, listener, metrics, time); WorkerConfigurator configurator = configurator(configDef); this.metrics = new ConnectMetrics(workerId, configurator, time); this.plugins = plugins(pluginLoader); this.configTransformer = new WorkerConfigTransformer(configDef); this.coordinator = herderProvider.getcoordinator(metrics); }
|
Herder 中有几个关键字段:
DistributedHerderProvider
:管理 Worker 集群中的各个 Herder 实例。
ConnectMetrics
:收集和报告 Connect 相关的指标数据。
Plugins
:存储加载的 Connector 插件。
Coordinator
:负责协调 Connectors 和 Tasks 在 Worker 集群中的分布。
2. Herder 启动过程
在 herder.start()
方法中,会执行以下核心逻辑:
1 2 3 4 5 6 7 8 9 10 11
| public void start() { log.info("Herder starting"); herderProvider.startProvider(); coordinator.startCoordinator(); coordinator.joinGroup(); }
|
- 启动
DistributedHerderProvider
和 Coordinator
。
- 调用
Coordinator
的 joinGroup()
方法,将当前 Worker 加入集群,并注册为 Leader 候选者。
在加入集群后,Herder 会循环执行 poll
操作,监听并处理集群事件和请求。
3. Rebalance 机制
Rebalance 是 Kafka Connect 实现高可用和负载均衡的关键机制。在 Herder 中,Rebalance 主要由 Coordinator
组件驱动,核心实现位于 Coordinator
的 poll
方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Override public void poll() { try { maybeResolveMissingCoordinators(); maybeRefreshCoordinatorMetrics(); maybeRefreshFencedInstances(); if (needsRebuildPartitionAssignment()) { maybeRebuildPartitionAssignment(); } if (needsConnectorAssignment()) { performConnectorAssignment(); } } catch (...) { } }
|
主要步骤包括:
- 检查是否需要执行 Rebalance。
- 重新计算 Connectors 和 Tasks 在集群中的分布。
- 根据分布计划,向相关 Workers 发送分配或回收 Connectors 和 Tasks 的请求。
这个过程由 Herder 持续轮询执行,以确保集群中的工作负载能够及时重新分布。
除了对 Herder 的核心实现进行分析,本文还展示了 Worker 启动流程和 Rebalance 机制的关键源码片段。在下一篇文章中,我们将继续深入探讨 Connector 端的实现原理。