Kafka Connect源码解读系列(十二) - Rebalance机制深入分析

在分布式集群模式下,Kafka Connect 能够实现高可用和负载均衡,这得益于它内置的 Rebalance 机制。Rebalance 过程会根据集群中活跃的 Worker 实例数量,重新计算并分配 Connectors 和 Tasks 在各个 Worker 之间的分布。本文将基于 Kafka Connect 2.7.0 版本的源码,深入剖析 Rebalance 机制的实现原理和关键流程。

Rebalance 触发条件

Rebalance 可能由以下几种情况触发:

  1. 集群中有新的 Worker 实例加入或离开。
  2. 某个 Worker 实例出现故障或重启。
  3. 用户手动调整了 Connector 或 Task 的配置和数量。

无论哪种情况,Rebalance 的目标都是确保 Connectors 和 Tasks 在集群中的均匀分布,从而实现高可用和负载均衡。

Rebalance 流程源码分析

Rebalance 的核心流程由 DistributedHerderProviderCoordinator 这两个组件共同驱动,我们来看一下关键的源码实现。

1. DistributedHerderProvider

DistributedHerderProvider 是 Herder 的管理者,负责监控集群中的变化并触发 Rebalance。它的 poll() 方法是关键所在:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void poll() {
try {
// 检查是否需要 Rebalance
maybeResolveMissingCoordinators();
maybeRefreshCoordinatorMetrics();
maybeRefreshFencedInstances();

// 如果需要 Rebalance,则通知 Coordinator 执行
if (needsRebuildPartitionAssignment()) {
maybeRebuildPartitionAssignment();
}
} catch (...) {
// ...
}
}

poll() 方法中,DistributedHerderProvider 会检查集群状态,如果发现需要执行 Rebalance,就会调用 maybeRebuildPartitionAssignment() 方法通知 Coordinator 进行分配计算和执行。

2. Coordinator

Coordinator 是 Rebalance 的执行者,它的 poll() 方法实现了 Rebalance 的核心逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public void poll() {
try {
// 检查是否需要执行 Connector 分配
if (needsConnectorAssignment()) {
performConnectorAssignment();
}

// 执行 Rebalance
if (needsRebuildPartitionAssignment()) {
maybeRebuildPartitionAssignment();
}

// ... 其他逻辑
} catch (...) {
// ...
}
}

poll() 方法中,Coordinator 会先检查是否需要执行 Connector 分配操作,如果需要,则调用 performConnectorAssignment() 方法进行分配。

接下来,如果发现需要执行 Rebalance,就会调用 maybeRebuildPartitionAssignment() 方法重新计算 Connectors 和 Tasks 的分布。

我们继续深入 maybeRebuildPartitionAssignment() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void maybeRebuildPartitionAssignment() {
// 获取集群中活跃的 Worker 实例列表
List<String> livingMembers = membership.getMembers();

// 计算新的分配计划
Map<String, WorkerAssignor.Assignment> newAssignment =
workerAssignor.performAssignment(connectors, livingMembers);

// 执行分配计划
for (Map.Entry<String, WorkerAssignor.Assignment> entry : newAssignment.entrySet()) {
String workerId = entry.getKey();
WorkerAssignor.Assignment assignment = entry.getValue();

// 向目标 Worker 发送分配请求
sendAssignmentToWorker(workerId, assignment);
}
}

这个方法的主要步骤包括:

  1. 获取集群中活跃的 Worker 实例列表。
  2. 调用 WorkerAssignor 计算新的分配计划。
  3. 遍历分配计划,向每个 Worker 发送分配请求。

在分配计划的计算过程中,WorkerAssignor 会根据一些策略和算法,将 Connectors 和 Tasks 均匀地分配到各个 Worker 实例上。

3. Worker 执行分配请求

最后,我们来看一下 Worker 如何响应和执行分配请求。以 HerderupdateAssignment() 方法为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void updateAssignment(Assignment assignment) {
// 获取已分配和未分配的 Connectors 和 Tasks
Set<String> assigned = new HashSet<>(assignment.connectors());
Set<ConnectorInfo> realConnectors = connectors.stream()
.filter(info -> assigned.contains(info.name()))
.collect(Collectors.toSet());

Set<ConnectorInfo> unassigned = new HashSet<>(connectors);
unassigned.removeAll(realConnectors);

// 执行分配操作
assignment.connectors().forEach(
connectorName -> updateConnector(realConnectors, connectorName, assignment));
unassigned.forEach(info -> revokeConnector(info));
}

在这个方法中,Worker 会首先获取已分配和未分配的 Connectors 和 Tasks 列表。然后,对于已分配的 Connectors,Worker 会执行更新或创建操作;对于未分配的 Connectors,Worker 会执行撤销操作。

通过上述源码分析,我们深入了解了 Kafka Connect 是如何实现 Rebalance 机制的。这个过程由 DistributedHerderProviderCoordinator 共同协作完成,包括监控集群状态、计算分配计划、执行分配请求等关键步骤。通过 Rebalance,Kafka Connect 能够动态地在集群中重新分布 Connectors 和 Tasks,从而实现高可用和负载均衡。

在下一篇文章中,我们将继续探讨 Kafka Connect 的其他关键特性,如故障转移和恢复机制、Incremental Cooperative Rebalancing 等。