Kafka Connect 提供了 REST 接口,用于外部客户端对 Connectors、Tasks 等资源进行管理和配置。这些 REST 接口由 Connect 进程内置的 HTTP 服务器提供,本文将围绕服务端的实现原理展开源码分析。
REST 服务器初始化
REST 服务器的初始化发生在 ConnectStandalone
类的 startConnector()
方法中,代码如下:
1 | AdvertisedListener listener = createAdvertisedListener(); |
- 创建
AdvertisedListener
对象,用于监听外部请求。 - 创建
ConnectWebServer
实例,作为 REST 服务器的入口点。 - 调用
initializeServer()
方法初始化服务器。 - 调用
initializeResources()
方法注册 REST 资源处理器。
接下来,我们重点分析 ConnectWebServer
中服务器初始化的相关实现。
Jetty 服务器初始化
ConnectWebServer
内部使用 Jetty 作为嵌入式 Web 服务器,其初始化代码如下:
1 | public void initializeServer() { |
该方法执行以下关键步骤:
- 创建 Jetty
Server
实例。 - 配置 Jetty
ServerConnector
。 - 创建
ServletContextHandler
和ServletContainer
。 - 注册 Jersey 服务,用于处理 REST 请求。
通过这些步骤,Jetty 服务器的基本框架就建立起来了。接下来,需要注册具体的 REST 资源处理器。
REST 资源处理器注册
REST 资源处理器的注册发生在 initializeResources()
方法中,代码如下:
1 | public void initializeResources(Herder herder, String advertisedUrl) { |
- 创建
ConnectRestExtension
实例,作为 REST 扩展点。 - 调用
resourceProvider.registerResources()
方法,注册 REST 资源处理器类。 - 启动 Jetty 服务器。
ConnectRestExtension
负责提供所有需要注册的 REST 资源处理器类,这些类通过实现 @Path
注解来定义 REST 端点路径。例如:
1 |
|
ConnectorsResource
类中的 createConnector()
方法对应 /connectors
端点的 POST 请求,用于创建新的 Connector。
通过上述源码分析,我们了解了 Kafka Connect 是如何初始化内置的 REST 服务器,以及如何注册处理不同 REST 端点的资源处理器。在下一篇文章中,我们将继续探讨如何通过 REST Client 调用这些服务端 API。