【Canal源码分析】client工作过程

client的工作过程,需要我们自己去编写对应的逻辑,我们目前只能从example写的例子来看。目前examle中提供了两个例子,一个是单机的,一个是集群的cluster,我们后续如果需要进行开发的话,其实也是开发我们自己的client,以及client的一些逻辑。我们主要看下集群的client是如何实现和消费的,又是怎么和server进行数据交互的。

我们来看看具体的代码:

protected void process() { int batchSize = 5 * 1024; while (running) { try { MDC.put("destination", destination); connector.connect(); connector.subscribe(); waiting = false; while (running) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { // try { // Thread.sleep(1000); // } catch (InterruptedException e) { // } } else { printSummary(message, batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 } } catch (Exception e) { logger.error("process error!", e); } finally { connector.disconnect(); MDC.remove("destination"); } } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzgwyg.html