# easy-canal-client **Repository Path**: cowboy2014/easy-canal-client ## Basic Information - **Project Name**: easy-canal-client - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 2 - **Forks**: 0 - **Created**: 2021-10-26 - **Last Updated**: 2021-11-08 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # easy-canal-client 参考`canal-client-springboot-starter`自己构建了一个`easy-canal-client`的项目。 > 源码已开源:https://gitee.com/cowboy2014/easy-canal-client.git ## 一、系统架构图 架构示意图如下: ![](https://img-blog.csdnimg.cn/img_convert/b1a681e45cca07259c30399ecad2a2f2.png) ## 二、集成步骤 ### 2-1 canal server升级1.1.5 canal需要升级为`1.1.5`,canal把binlog数据解析完成后,就把数据直接投递给rabbitmq了——1.1.5可以把数据直接投递给rabbitmq。 `canal.properties`参考配置: ```properties # tcp, kafka, rocketMQ, rabbitMQ canal.serverMode = rabbitMQ ################################################## ######### RabbitMQ ############# ################################################## rabbitmq.host = 172.16.150.11 rabbitmq.virtual.host = / rabbitmq.exchange = canal-exchange rabbitmq.username = canal rabbitmq.password = canal%123 rabbitmq.deliveryMode = ``` `conf/example/instance.properties`参考配置: ```properties # position info canal.instance.master.address=172.16.150.12:3306 # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal%123 # table regex canal.instance.filter.regex=lpm-center\\.lpm_(park|company|store|route) # mq config canal.mq.topic=shangwt ``` ### 2-2 业务模块集成Rabbitmq 业务模块直接集成rabbitmq,作为消费者进行数据的解析、同步。 ### 2-3 使用方法 #### 1. 新建Handler,用@CanalTable注解标注 ```java @Component @CanalTable(value = "lpm_park") @Slf4j public class ParkHandler implements EntryHandler { @Resource private ElSearchParkServiceImpl elSearchParkService; @Resource private EsIndexes esIndexes; @Override public void insert(Park park) { log.info("insert message {}", park); try { elSearchParkService.synchronous(esIndexes.getPark(), park.getId()); } catch (IOException e) { log.error("es insert wrong!"); } } @Override public void update(Park before, Park park) { try { if (ObjectUtil.isNotEmpty(before.getDeleted()) && !before.getDeleted().equals(park.getDeleted()) && park.getDeleted() == 1){ this.delete(park); } if (ObjectUtil.isNotEmpty(before.getDeleted()) && !before.getDeleted().equals(park.getDeleted()) && park.getDeleted() == 0){ this.insert(park); } elSearchParkService.synchronous(esIndexes.getPark(), park.getId()); } catch (IOException e) { log.error("es insert wrong!"); } log.info("update after {}", park); } @Override public void delete(Park park) { log.info("delete {}", park); elSearchParkService.deleteById(esIndexes.getPark(), park.getId().intValue()); } } ``` #### 2. rabbitmq消费者端监听队列,调用对应的Handler进行消息消费 ```java /** * canal消息同步 * @param info */ @RabbitListener(queues = {"shangwtQueue"},containerFactory = "multiListenerContainer") public void consumeMsg(FlatMessage info){ try { handlerUtil.handleMessage(info); }catch (Exception e){ log.error("canal消息-监听者-发生异常:",e.fillInStackTrace()); } } ``` ## 三、鸣谢 https://github.com/NormanGyllenhaal/canal-client 本工具包是根据原作者(yangpengrc)相关代码整合改编而成,感谢原的辛勤付出~