在上一篇文章中,我们分析了 Worker 端的核心实现,包括 Worker 启动流程、Herder 的职责以及 Rebalance 机制的源码实现。本文将继续剖析 Connector 端的相关代码,重点包括 Connector 接口、Source/Sink Connector 的实现、Task 执行流程以及 Connector 生命周期管理。
Connector 接口定义源码分析
Connector 的核心接口定义位于 org.apache.kafka.connect.connector.Connector
接口中,让我们逐一分析其中定义的方法:
1 | public interface Connector { |
这组接口定义了 Connector 在生命周期各个阶段所需要实现的基本功能,例如启动、停止、配置验证、Task 管理等。任何自定义的 Connector 实现都需要遵循这个接口的约定。
Source/Sink Connector 实现源码解读
Kafka Connect 内置了一些常用的 Source 和 Sink Connector 实现,我们以 FileStreamSourceConnector
为例,分析其对 Connector 接口的实现:
1 | public class FileStreamSourceConnector extends SourceConnector { |
可以看到,FileStreamSourceConnector
继承自 SourceConnector
抽象类,并分别实现了 start()
、taskConfigs()
和 config()
等方法。其中,taskConfigs()
方法的实现决定了该 Connector 在运行时会创建多少个 Task 实例。
对于 Sink Connector 的实现,原理类似,只是继承的是 SinkConnector
抽象类。
Task 执行流程核心源码追踪
Task 是 Connector 的实际执行单元,负责与外部系统进行数据传输。以 FileStreamSourceTask
为例,我们来看一下它的主要执行流程:
1 | public class FileStreamSourceTask extends SourceTask { |
FileStreamSourceTask
继承自 SourceTask
抽象类,实现了 start()
、poll()
和 stop()
等方法。在 poll()
方法中,Task 会循环读取外部数据源(这里是文件)中的数据,并将读取到的数据封装为 SourceRecord
对象返回。
对于 Sink Task,执行流程与 Source Task 类似,只是将逻辑反向,从 Kafka 消费记录并写入到外部系统中。
Connector 生命周期管理源码解析
Connector 的生命周期管理由 Herder 组件负责,我们以 Herder
的 putConnectorConfig
方法为例,分析 Connector 生命周期的管理过程:
1 | public AbstractHerder.ConnectorInfo putConnectorConfig( |
该方法首先会验证 Connector 配置的合法性,然后根据 Connector 的当前状态,决定执行以下操作之一:
- 如果 Connector 已存在且不允许重启,则直接返回。
- 如果 Connector 处于非法状态,则抛出异常。
- 如果 Connector 处于可重启状态,则停止当前 Connector 并重新启动一个新实例。
在 putConnectorConfig
内部,还会调用 Connector
接口中定义的生命周期方法,如 start()
、stop()
等,以执行相应的操作。
通过上述源码分析,我们深入了解了 Connector 端的核心实现,包括 Connector 接口定义、Source/Sink Connector 的流程等。