kafka connector 使用总结以及自定义connector开发

Kafaka connect 是一种用于在Kafka和其他系统之间可扩展的、可靠的流式传输数据的工具。它使得能够快速定义将大量数据集合移入和移出Kafka的连接器变得简单。Kafka Connect可以从数据库或应用程序服务器收集数据到Kafka topic,使数据可用于低延迟的流处理。导出作业可以将数据从Kafka topic传输到二次存储和查询系统,或者传递到批处理系统以进行离线分析。

Kafaka connect的核心组件:
Source:负责将外部数据写入到kafka的topic中。
Sink:负责从kafka中读取数据到自己需要的地方去,比如读取到HDFS,hbase等。
Connectors :通过管理任务来协调数据流的高级抽象
Tasks:数据写入kafk和从kafka中读出数据的具体实现,source和sink使用时都需要Task

kafka connector 使用总结以及自定义connector开发

 

 

 


Workers:运行connectors和tasks的进程

kafka connector 使用总结以及自定义connector开发

 

 

 


Converters:kafka connect和其他存储系统直接发送或者接受数据之间转换数据,

converter会把bytes数据转换成kafka connect内部的格式,也可以把kafka connect内部存储格式的数据转变成bytes,converter对connector来说是解耦的,所以其他的connector都可以重用,例如,使用了avro converter,那么jdbc connector可以写avro格式的数据到kafka,当然,hdfs connector也可以从kafka中读出avro格式的数据。

kafka connector 使用总结以及自定义connector开发

 

 

 


Transforms:一种轻量级数据调整的工具
Kafka connect 工作模式:
Kafka connect 有两种工作模式:
standalone:在standalone模式中,所有的worker都在一个独立的进程中完成。
distributed:distributed模式具有高扩展性,以及提供自动容错机制。你可以使用一个group.ip来启动很多worker进程,在有效的worker进程中它们会自动的去协调执行connector和task,如果你新加了一个worker或者挂了一个worker,其他的worker会检测到然后在重新分配connector和task。

kafka connector 使用总结以及自定义connector开发

 

 

 


在分布式模式下通过rest api来管理connector。
connector的常见管理操作API:
GET /connectors – 返回所有正在运行的connector名。
POST /connectors – 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。
GET /connectors/{name} – 获取指定connetor的信息。
GET /connectors/{name}/config – 获取指定connector的配置信息。
PUT /connectors/{name}/config – 更新指定connector的配置信息。
GET /connectors/{name}/status – 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
GET /connectors/{name}/tasks – 获取指定connector正在运行的task。
GET /connectors/{name}/tasks/{taskid}/status – 获取指定connector的task的状态信息。
PUT /connectors/{name}/pause – 暂停connector和它的task,停止数据处理知道它被恢复。
PUT /connectors/{name}/resume – 恢复一个被暂停的connector。
POST /connectors/{name}/restart – 重启一个connector,尤其是在一个connector运行失败的情况下比较常用
POST /connectors/{name}/tasks/{taskId}/restart – 重启一个task,一般是因为它运行失败才这样做。
DELETE /connectors/{name} – 删除一个connector,停止它的所有task并删除配置。

如何开发自己的Connector:

1、引入maven依赖。

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>connect-api</artifactId> <version>${kafka.version}</version> </dependency>

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpxgsz.html