Canal的使用
使用docker环境安装mysql、canal、elasticsearch,基于binlog利用canal实现mysql的数据同步到elasticsearch中,并在springboot进行整合。
-
安装mysql并配置binlog
-
安装elasticsearch
-
安装Canal server
-
在springboot实现整合
所用环境版本类型:
- MacOS Monterey
- mysql 5.7.36
- es 7.17.9
- cannal.server: 1.1.5
1、安装mysql
安装命令
// 拉取镜像docker pull mysql:5.7.36// 创建容器docker run --name mysql5.7.36 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=admin -d mysql:5.7.36// 进入容器docker exec -it mysql5.7.36 /bin/bash// 添加vim编辑apt-get updateapt-get install vim#注意 apt-get报错,可换yum命令使用。// 配置mysqlcd /etc/mysql/mysql.conf.dvim mysqld.cnf // 修改mysql配置// 拉取镜像 docker pull mysql:5.7.36 // 创建容器 docker run --name mysql5.7.36 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=admin -d mysql:5.7.36 // 进入容器 docker exec -it mysql5.7.36 /bin/bash // 添加vim编辑 apt-get update apt-get install vim #注意 apt-get报错,可换yum命令使用。 // 配置mysql cd /etc/mysql/mysql.conf.d vim mysqld.cnf // 修改mysql配置// 拉取镜像 docker pull mysql:5.7.36 // 创建容器 docker run --name mysql5.7.36 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=admin -d mysql:5.7.36 // 进入容器 docker exec -it mysql5.7.36 /bin/bash // 添加vim编辑 apt-get update apt-get install vim #注意 apt-get报错,可换yum命令使用。 // 配置mysql cd /etc/mysql/mysql.conf.d vim mysqld.cnf // 修改mysql配置
配置:与官网一致
[mysqld]#binlog settinglog-bin=mysql-bin // 开启logbinbinlog-format=ROW // binlog日志格式server-id=1 // mysql主从备份serverId,canal中不能与此相同[mysqld] #binlog setting log-bin=mysql-bin // 开启logbin binlog-format=ROW // binlog日志格式 server-id=1 // mysql主从备份serverId,canal中不能与此相同[mysqld] #binlog setting log-bin=mysql-bin // 开启logbin binlog-format=ROW // binlog日志格式 server-id=1 // mysql主从备份serverId,canal中不能与此相同
数据库创建一个canal账号,并且设置slave
,dump
权限
CREATE USER canal IDENTIFIED BY 'canal';GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;FLUSH PRIVILEGES;CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
因为mysql8.0.3后身份检验方式为caching_sha2_password
,但canal使用的是mysql_native_password
,因此需要设置检验方式(如果该版本之前的可跳过),否则会报错IOException: caching_sha2_password Auth failed
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';select host,user,plugin from mysql.user ;ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal'; select host,user,plugin from mysql.user ;ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal'; select host,user,plugin from mysql.user ;
保存退出,然后重启mysql容器
docker restart mysql5.7.36docker restart mysql5.7.36docker restart mysql5.7.36
做完这一步可查看前面是否有误:
// 进入容器docker exec -it mysql5.7.36 /bin/bashmysql -uroot -pshow master status // binlog日志文件reset master; // 重启日志show variables like 'binlog_format'; //查看是否配置成功// 进入容器 docker exec -it mysql5.7.36 /bin/bash mysql -uroot -p show master status // binlog日志文件 reset master; // 重启日志 show variables like 'binlog_format'; //查看是否配置成功// 进入容器 docker exec -it mysql5.7.36 /bin/bash mysql -uroot -p show master status // binlog日志文件 reset master; // 重启日志 show variables like 'binlog_format'; //查看是否配置成功
查看日志文件:
cd /var/lib/mysql // 进入日志文件目录mysqlbinlog -vv mysql-bin.000001 // row格式查看日志cd /var/lib/mysql // 进入日志文件目录 mysqlbinlog -vv mysql-bin.000001 // row格式查看日志cd /var/lib/mysql // 进入日志文件目录 mysqlbinlog -vv mysql-bin.000001 // row格式查看日志
mysql已经安装成功了。
2、安装es
略。
3、安装canal-server
docker pull canal/canal-server:v1.1.5docker run --name canal1.1.5 -p 11111:11111 --link mysql5.7.36:mysql5.7.36 -id canal/canal-server:v1.1.5docker pull canal/canal-server:v1.1.5 docker run --name canal1.1.5 -p 11111:11111 --link mysql5.7.36:mysql5.7.36 -id canal/canal-server:v1.1.5docker pull canal/canal-server:v1.1.5 docker run --name canal1.1.5 -p 11111:11111 --link mysql5.7.36:mysql5.7.36 -id canal/canal-server:v1.1.5
修改对应的配置:
docker exec -it canal1.1.5 /bin/bashcd canal-server/conf/example/vi instance.properties // 修改配置# 把0改成10,只要不和mysql的id相同就行canal.instance.mysql.slaveId=10# 修改成mysql对应的账号密码,mysql5.7.36就是mysql镜像的链接别名canal.instance.master.address=mysql5.7.36:3306canal.instance.dbUsername=canalcanal.instance.dbPassword=canaldocker exec -it canal1.1.5 /bin/bash cd canal-server/conf/example/ vi instance.properties // 修改配置 # 把0改成10,只要不和mysql的id相同就行 canal.instance.mysql.slaveId=10 # 修改成mysql对应的账号密码,mysql5.7.36就是mysql镜像的链接别名 canal.instance.master.address=mysql5.7.36:3306 canal.instance.dbUsername=canal canal.instance.dbPassword=canaldocker exec -it canal1.1.5 /bin/bash cd canal-server/conf/example/ vi instance.properties // 修改配置 # 把0改成10,只要不和mysql的id相同就行 canal.instance.mysql.slaveId=10 # 修改成mysql对应的账号密码,mysql5.7.36就是mysql镜像的链接别名 canal.instance.master.address=mysql5.7.36:3306 canal.instance.dbUsername=canal canal.instance.dbPassword=canal
################################################### mysql serverId , v1.0.26+ will autoGencanal.instance.mysql.slaveId=10# enable gtid use true/falsecanal.instance.gtidon=false# position infocanal.instance.master.address=mysql5.7.36:3306canal.instance.master.journal.name=canal.instance.master.position=canal.instance.master.timestamp=canal.instance.master.gtid=# rds oss binlogcanal.instance.rds.accesskey=canal.instance.rds.secretkey=canal.instance.rds.instanceId=# table meta tsdb infocanal.instance.tsdb.enable=true#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb#canal.instance.tsdb.dbUsername=canal#canal.instance.tsdb.dbPassword=canal#canal.instance.standby.address =#canal.instance.standby.journal.name =#canal.instance.standby.position =#canal.instance.standby.timestamp =#canal.instance.standby.gtid=# username/passwordcanal.instance.dbUsername=canalcanal.instance.dbPassword=canalcanal.instance.connectionCharset = UTF-8# enable druid Decrypt database passwordcanal.instance.enableDruid=false#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regexcanal.instance.filter.regex=.*\\..*# table black regexcanal.instance.filter.black.regex=mysql\\.slave_.*# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq configcanal.mq.topic=example# dynamic topic route by schema or table regex#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*canal.mq.partition=0# hash partition config#canal.mq.partitionsNum=3#canal.mq.partitionHash=test.table:id^name,.*\\..*#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6################################################################################################## ## mysql serverId , v1.0.26+ will autoGen canal.instance.mysql.slaveId=10 # enable gtid use true/false canal.instance.gtidon=false # position info canal.instance.master.address=mysql5.7.36:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex canal.instance.filter.regex=.*\\..* # table black regex canal.instance.filter.black.regex=mysql\\.slave_.* # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch # mq config canal.mq.topic=example # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #canal.mq.partitionHash=test.table:id^name,.*\\..* #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6 ################################################################################################## ## mysql serverId , v1.0.26+ will autoGen canal.instance.mysql.slaveId=10 # enable gtid use true/false canal.instance.gtidon=false # position info canal.instance.master.address=mysql5.7.36:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex canal.instance.filter.regex=.*\\..* # table black regex canal.instance.filter.black.regex=mysql\\.slave_.* # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch # mq config canal.mq.topic=example # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #canal.mq.partitionHash=test.table:id^name,.*\\..* #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6 #################################################
也可精确点对点同步(不是很复杂默认就好,无需配置下面内容)
# 开始同步的binlog日志文件,注意这里的binlog文件名以你自己查出来的为准canal.instance.master.journal.name=mysql-bin.000001# 开始同步的binlog文件位置canal.instance.master.position=0# 开始同步时间点 时间戳形式canal.instance.master.timestamp=1546272000# 配置不同步的mysql库canal.instance.filter.black.regex=mysql\..*mysql数据同步起点说明:canal.instance.master.journal.name + canal.instance.master.position : 精确指定一个binlog位点,进行启动canal.instance.master.timestamp : 指定一个时间戳,canal会自动遍历mysql binlog,找到对应时间戳的binlog位点后,进行启动不指定任何信息:默认从当前数据库的位点,进行启动。(show master status)# 开始同步的binlog日志文件,注意这里的binlog文件名以你自己查出来的为准 canal.instance.master.journal.name=mysql-bin.000001 # 开始同步的binlog文件位置 canal.instance.master.position=0 # 开始同步时间点 时间戳形式 canal.instance.master.timestamp=1546272000 # 配置不同步的mysql库 canal.instance.filter.black.regex=mysql\..* mysql数据同步起点说明: canal.instance.master.journal.name + canal.instance.master.position : 精确指定一个binlog位点,进行启动 canal.instance.master.timestamp : 指定一个时间戳,canal会自动遍历mysql binlog,找到对应时间戳的binlog位点后,进行启动不指定任何信息:默认从当前数据库的位点,进行启动。(show master status)# 开始同步的binlog日志文件,注意这里的binlog文件名以你自己查出来的为准 canal.instance.master.journal.name=mysql-bin.000001 # 开始同步的binlog文件位置 canal.instance.master.position=0 # 开始同步时间点 时间戳形式 canal.instance.master.timestamp=1546272000 # 配置不同步的mysql库 canal.instance.filter.black.regex=mysql\..* mysql数据同步起点说明: canal.instance.master.journal.name + canal.instance.master.position : 精确指定一个binlog位点,进行启动 canal.instance.master.timestamp : 指定一个时间戳,canal会自动遍历mysql binlog,找到对应时间戳的binlog位点后,进行启动不指定任何信息:默认从当前数据库的位点,进行启动。(show master status)
instance.properties文件的配置详情可访问github.com/alibaba/can…
查看配置是否成功:
#首先重启一下canaldocker restart canal1.1.5docker exec -it canal1.1.5 /bin/bashcd canal-server/logs/example/tail -100f example.log // 查看日志#首先重启一下canal docker restart canal1.1.5 docker exec -it canal1.1.5 /bin/bash cd canal-server/logs/example/ tail -100f example.log // 查看日志#首先重启一下canal docker restart canal1.1.5 docker exec -it canal1.1.5 /bin/bash cd canal-server/logs/example/ tail -100f example.log // 查看日志
如以上状况,则说明mysql连接canal-server成功,此时mysql中的数据变化,都会在canal中有同步。
可以通过Java程序测试有没连接上mysql:
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version></dependency><dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.4</version> </dependency><dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.4</version> </dependency>
import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.common.utils.AddressUtils;import com.alibaba.otter.canal.protocol.CanalEntry;import com.alibaba.otter.canal.protocol.CanalEntry.*;import com.alibaba.otter.canal.protocol.Message;import java.net.InetSocketAddress;import java.util.List;public class SimpleCanalClientExample {public static void main(String args[]) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();int totalEmptyCount = 120;while (emptyCount < totalEmptyCount) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {emptyCount = 0;// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("empty too many times, exit");} finally {connector.disconnect();}}private static void printEntry(List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}RowChange rowChage = null;try {rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}EventType eventType = rowChage.getEventType();System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------> before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------> after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<Column> columns) {for (Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());}}}import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.common.utils.AddressUtils; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.CanalEntry.*; import com.alibaba.otter.canal.protocol.Message; import java.net.InetSocketAddress; import java.util.List; public class SimpleCanalClientExample { public static void main(String args[]) { // 创建链接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); int totalEmptyCount = 120; while (emptyCount < totalEmptyCount) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } } private static void printEntry(List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } }import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.common.utils.AddressUtils; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.CanalEntry.*; import com.alibaba.otter.canal.protocol.Message; import java.net.InetSocketAddress; import java.util.List; public class SimpleCanalClientExample { public static void main(String args[]) { // 创建链接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); int totalEmptyCount = 120; while (emptyCount < totalEmptyCount) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } } private static void printEntry(List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } }
在mysql的可视化工具进行数据更新后idea控制台就会有数据的显示。
4、整合springboot
自定义客户端client
1、新建一个springboot项目
引入依赖spring-boot-starter-data-elasticsearch
、canal-spring-boot-starter
<!-- Canal客户端服务 --><dependency><groupId>top.javatool</groupId><artifactId>canal-spring-boot-starter</artifactId><version>1.2.1-RELEASE</version></dependency><!-- elasticsearch--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId></dependency><!-- Canal客户端服务 --> <dependency> <groupId>top.javatool</groupId> <artifactId>canal-spring-boot-starter</artifactId> <version>1.2.1-RELEASE</version> </dependency> <!-- elasticsearch--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId> </dependency><!-- Canal客户端服务 --> <dependency> <groupId>top.javatool</groupId> <artifactId>canal-spring-boot-starter</artifactId> <version>1.2.1-RELEASE</version> </dependency> <!-- elasticsearch--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId> </dependency>
2、修改配置文件application.yml
# 应用名称spring:# es配置elasticsearch:uris: http://localhost:9200username: rootpassword: 123456# 数据库配置datasource:driver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://192.168.244.17:3306/canal_test?useSSL=false&useUnicode=true&characterEncoding=utf-8username: rootpassword: 123456# canal服务端地址canal:destination: example # canal的集群名字,要与安装canal时设置的名称一致server: 127.0.0.1:11111 # canal服务地址# 设置canal消息日志打印级别logging:level:top.javatool.canal.client: warn# 应用名称 spring: # es配置 elasticsearch: uris: http://localhost:9200 username: root password: 123456 # 数据库配置 datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://192.168.244.17:3306/canal_test?useSSL=false&useUnicode=true&characterEncoding=utf-8 username: root password: 123456 # canal服务端地址 canal: destination: example # canal的集群名字,要与安装canal时设置的名称一致 server: 127.0.0.1:11111 # canal服务地址 # 设置canal消息日志打印级别 logging: level: top.javatool.canal.client: warn# 应用名称 spring: # es配置 elasticsearch: uris: http://localhost:9200 username: root password: 123456 # 数据库配置 datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://192.168.244.17:3306/canal_test?useSSL=false&useUnicode=true&characterEncoding=utf-8 username: root password: 123456 # canal服务端地址 canal: destination: example # canal的集群名字,要与安装canal时设置的名称一致 server: 127.0.0.1:11111 # canal服务地址 # 设置canal消息日志打印级别 logging: level: top.javatool.canal.client: warn
3、新建实体类为下面数据同步做准备。
新建mysql表实体类
import com.baomidou.mybatisplus.annotation.IdType;import com.baomidou.mybatisplus.annotation.TableField;import com.baomidou.mybatisplus.annotation.TableId;import com.baomidou.mybatisplus.annotation.TableLogic;import com.baomidou.mybatisplus.annotation.TableName;import java.io.Serializable;import java.util.Date;import lombok.Data;/*** 帖子**/@TableName(value = "post")@Datapublic class Post implements Serializable {/*** id*/@TableId(type = IdType.ASSIGN_ID)private Long id;/*** 标题*/private String title;/*** 内容*/private String content;/*** 标签列表 json*/private String tags;/*** 点赞数*/private Integer thumbNum;/*** 收藏数*/private Integer favourNum;/*** 创建用户 id*/private Long userId;/*** 创建时间*/private Date createTime;/*** 更新时间*/private Date updateTime;/*** 是否删除*/@TableLogicprivate Integer isDelete;@TableField(exist = false)private static final long serialVersionUID = 1L;}import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableLogic; import com.baomidou.mybatisplus.annotation.TableName; import java.io.Serializable; import java.util.Date; import lombok.Data; /** * 帖子 * */ @TableName(value = "post") @Data public class Post implements Serializable { /** * id */ @TableId(type = IdType.ASSIGN_ID) private Long id; /** * 标题 */ private String title; /** * 内容 */ private String content; /** * 标签列表 json */ private String tags; /** * 点赞数 */ private Integer thumbNum; /** * 收藏数 */ private Integer favourNum; /** * 创建用户 id */ private Long userId; /** * 创建时间 */ private Date createTime; /** * 更新时间 */ private Date updateTime; /** * 是否删除 */ @TableLogic private Integer isDelete; @TableField(exist = false) private static final long serialVersionUID = 1L; }import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableLogic; import com.baomidou.mybatisplus.annotation.TableName; import java.io.Serializable; import java.util.Date; import lombok.Data; /** * 帖子 * */ @TableName(value = "post") @Data public class Post implements Serializable { /** * id */ @TableId(type = IdType.ASSIGN_ID) private Long id; /** * 标题 */ private String title; /** * 内容 */ private String content; /** * 标签列表 json */ private String tags; /** * 点赞数 */ private Integer thumbNum; /** * 收藏数 */ private Integer favourNum; /** * 创建用户 id */ private Long userId; /** * 创建时间 */ private Date createTime; /** * 更新时间 */ private Date updateTime; /** * 是否删除 */ @TableLogic private Integer isDelete; @TableField(exist = false) private static final long serialVersionUID = 1L; }
新建es实体类
import com.google.common.reflect.TypeToken;import com.google.gson.Gson;import com.song.search.model.entity.Post;import lombok.Data;import org.apache.commons.collections4.CollectionUtils;import org.apache.commons.lang3.StringUtils;import org.springframework.beans.BeanUtils;import org.springframework.data.annotation.Id;import org.springframework.data.elasticsearch.annotations.Document;import org.springframework.data.elasticsearch.annotations.Field;import org.springframework.data.elasticsearch.annotations.FieldType;import java.io.Serializable;import java.util.Date;import java.util.List;/*** 帖子 ES 包装类****/@Document(indexName = "post_v1")@Datapublic class PostEsDTO implements Serializable {private static final String DATE_TIME_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";/*** id*/@Idprivate Long id;/*** 标题*/@Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")private String title;/*** 内容*/@Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")private String content;/*** 标签列表*/@Field(type = FieldType.Keyword)private List<String> tags;/*** 创建用户 id*/@Field(type = FieldType.Keyword)private Long userId;/*** 创建时间*/@Field(index = false, store = true, type = FieldType.Date, format = {}, pattern = DATE_TIME_PATTERN)private Date createTime;/*** 更新时间*/@Field(index = false, store = true, type = FieldType.Date, format = {}, pattern = DATE_TIME_PATTERN)private Date updateTime;/*** 是否删除*/@Field(type = FieldType.Keyword)private Integer isDelete;private static final long serialVersionUID = 1L;private static final Gson GSON = new Gson();/*** 对象转包装类** @param post* @return*/public static PostEsDTO objToDto(Post post) {if (post == null) {return null;}PostEsDTO postEsDTO = new PostEsDTO();BeanUtils.copyProperties(post, postEsDTO);String tagsStr = post.getTags();if (StringUtils.isNotBlank(tagsStr)) {postEsDTO.setTags(GSON.fromJson(tagsStr, new TypeToken<List<String>>() {}.getType()));}return postEsDTO;}/*** 包装类转对象** @param postEsDTO* @return*/public static Post dtoToObj(PostEsDTO postEsDTO) {if (postEsDTO == null) {return null;}Post post = new Post();BeanUtils.copyProperties(postEsDTO, post);List<String> tagList = postEsDTO.getTags();if (CollectionUtils.isNotEmpty(tagList)) {post.setTags(GSON.toJson(tagList));}return post;}}import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import com.song.search.model.entity.Post; import lombok.Data; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.BeanUtils; import org.springframework.data.annotation.Id; import org.springframework.data.elasticsearch.annotations.Document; import org.springframework.data.elasticsearch.annotations.Field; import org.springframework.data.elasticsearch.annotations.FieldType; import java.io.Serializable; import java.util.Date; import java.util.List; /** * 帖子 ES 包装类 * * **/ @Document(indexName = "post_v1") @Data public class PostEsDTO implements Serializable { private static final String DATE_TIME_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; /** * id */ @Id private Long id; /** * 标题 */ @Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart") private String title; /** * 内容 */ @Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart") private String content; /** * 标签列表 */ @Field(type = FieldType.Keyword) private List<String> tags; /** * 创建用户 id */ @Field(type = FieldType.Keyword) private Long userId; /** * 创建时间 */ @Field(index = false, store = true, type = FieldType.Date, format = {}, pattern = DATE_TIME_PATTERN) private Date createTime; /** * 更新时间 */ @Field(index = false, store = true, type = FieldType.Date, format = {}, pattern = DATE_TIME_PATTERN) private Date updateTime; /** * 是否删除 */ @Field(type = FieldType.Keyword) private Integer isDelete; private static final long serialVersionUID = 1L; private static final Gson GSON = new Gson(); /** * 对象转包装类 * * @param post * @return */ public static PostEsDTO objToDto(Post post) { if (post == null) { return null; } PostEsDTO postEsDTO = new PostEsDTO(); BeanUtils.copyProperties(post, postEsDTO); String tagsStr = post.getTags(); if (StringUtils.isNotBlank(tagsStr)) { postEsDTO.setTags(GSON.fromJson(tagsStr, new TypeToken<List<String>>() { }.getType())); } return postEsDTO; } /** * 包装类转对象 * * @param postEsDTO * @return */ public static Post dtoToObj(PostEsDTO postEsDTO) { if (postEsDTO == null) { return null; } Post post = new Post(); BeanUtils.copyProperties(postEsDTO, post); List<String> tagList = postEsDTO.getTags(); if (CollectionUtils.isNotEmpty(tagList)) { post.setTags(GSON.toJson(tagList)); } return post; } }import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import com.song.search.model.entity.Post; import lombok.Data; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.BeanUtils; import org.springframework.data.annotation.Id; import org.springframework.data.elasticsearch.annotations.Document; import org.springframework.data.elasticsearch.annotations.Field; import org.springframework.data.elasticsearch.annotations.FieldType; import java.io.Serializable; import java.util.Date; import java.util.List; /** * 帖子 ES 包装类 * * **/ @Document(indexName = "post_v1") @Data public class PostEsDTO implements Serializable { private static final String DATE_TIME_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; /** * id */ @Id private Long id; /** * 标题 */ @Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart") private String title; /** * 内容 */ @Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart") private String content; /** * 标签列表 */ @Field(type = FieldType.Keyword) private List<String> tags; /** * 创建用户 id */ @Field(type = FieldType.Keyword) private Long userId; /** * 创建时间 */ @Field(index = false, store = true, type = FieldType.Date, format = {}, pattern = DATE_TIME_PATTERN) private Date createTime; /** * 更新时间 */ @Field(index = false, store = true, type = FieldType.Date, format = {}, pattern = DATE_TIME_PATTERN) private Date updateTime; /** * 是否删除 */ @Field(type = FieldType.Keyword) private Integer isDelete; private static final long serialVersionUID = 1L; private static final Gson GSON = new Gson(); /** * 对象转包装类 * * @param post * @return */ public static PostEsDTO objToDto(Post post) { if (post == null) { return null; } PostEsDTO postEsDTO = new PostEsDTO(); BeanUtils.copyProperties(post, postEsDTO); String tagsStr = post.getTags(); if (StringUtils.isNotBlank(tagsStr)) { postEsDTO.setTags(GSON.fromJson(tagsStr, new TypeToken<List<String>>() { }.getType())); } return postEsDTO; } /** * 包装类转对象 * * @param postEsDTO * @return */ public static Post dtoToObj(PostEsDTO postEsDTO) { if (postEsDTO == null) { return null; } Post post = new Post(); BeanUtils.copyProperties(postEsDTO, post); List<String> tagList = postEsDTO.getTags(); if (CollectionUtils.isNotEmpty(tagList)) { post.setTags(GSON.toJson(tagList)); } return post; } }
4、接下来我们基于canal-client提供的EntryHandler
类来实现对于数据表的监控,从而达到数据的增删改同步。
import com.song.search.esdao.PostEsDao;import com.song.search.model.dto.post.PostEsDTO;import com.song.search.model.entity.Post;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Component;import top.javatool.canal.client.annotation.CanalTable;import top.javatool.canal.client.handler.EntryHandler;import javax.annotation.Resource;@CanalTable("post") // mysql数据库表明@Component@Slf4jpublic class CanalSyncPostToEs implements EntryHandler<Post> {@Resourceprivate PostEsDao postEsDao;/*** mysql中数据有新增时自动执行** @param hotel 新增的数据*/@Overridepublic void insert(Post hotel) {PostEsDTO postEsDTO = PostEsDTO.objToDto(hotel);//把新增数据hotel,添加到ES即可postEsDao.save(postEsDTO);}/*** mysql中数据有修改时自动执行** @param before 修改前的数据* @param after 修改后的数据*/@Overridepublic void update(Post before, Post after) {//把修改数据,更新到ES即可PostEsDTO postEsDTO = PostEsDTO.objToDto(after);//把新增数据hotel,添加到ES即可postEsDao.save(postEsDTO);}/*** ysql中数据有删除时自动执行** @param hotel 要删除的数据*/@Overridepublic void delete(Post hotel) {//把要删除的数据hotel,从ES删除即可postEsDao.deleteById(hotel.getId());}}import com.song.search.esdao.PostEsDao; import com.song.search.model.dto.post.PostEsDTO; import com.song.search.model.entity.Post; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import top.javatool.canal.client.annotation.CanalTable; import top.javatool.canal.client.handler.EntryHandler; import javax.annotation.Resource; @CanalTable("post") // mysql数据库表明 @Component @Slf4j public class CanalSyncPostToEs implements EntryHandler<Post> { @Resource private PostEsDao postEsDao; /** * mysql中数据有新增时自动执行 * * @param hotel 新增的数据 */ @Override public void insert(Post hotel) { PostEsDTO postEsDTO = PostEsDTO.objToDto(hotel); //把新增数据hotel,添加到ES即可 postEsDao.save(postEsDTO); } /** * mysql中数据有修改时自动执行 * * @param before 修改前的数据 * @param after 修改后的数据 */ @Override public void update(Post before, Post after) { //把修改数据,更新到ES即可 PostEsDTO postEsDTO = PostEsDTO.objToDto(after); //把新增数据hotel,添加到ES即可 postEsDao.save(postEsDTO); } /** * ysql中数据有删除时自动执行 * * @param hotel 要删除的数据 */ @Override public void delete(Post hotel) { //把要删除的数据hotel,从ES删除即可 postEsDao.deleteById(hotel.getId()); } }import com.song.search.esdao.PostEsDao; import com.song.search.model.dto.post.PostEsDTO; import com.song.search.model.entity.Post; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import top.javatool.canal.client.annotation.CanalTable; import top.javatool.canal.client.handler.EntryHandler; import javax.annotation.Resource; @CanalTable("post") // mysql数据库表明 @Component @Slf4j public class CanalSyncPostToEs implements EntryHandler<Post> { @Resource private PostEsDao postEsDao; /** * mysql中数据有新增时自动执行 * * @param hotel 新增的数据 */ @Override public void insert(Post hotel) { PostEsDTO postEsDTO = PostEsDTO.objToDto(hotel); //把新增数据hotel,添加到ES即可 postEsDao.save(postEsDTO); } /** * mysql中数据有修改时自动执行 * * @param before 修改前的数据 * @param after 修改后的数据 */ @Override public void update(Post before, Post after) { //把修改数据,更新到ES即可 PostEsDTO postEsDTO = PostEsDTO.objToDto(after); //把新增数据hotel,添加到ES即可 postEsDao.save(postEsDTO); } /** * ysql中数据有删除时自动执行 * * @param hotel 要删除的数据 */ @Override public void delete(Post hotel) { //把要删除的数据hotel,从ES删除即可 postEsDao.deleteById(hotel.getId()); } }
补充:es的dao操作/与mybatis操作数据库相同
/*** 帖子 ES 操作***/public interface PostEsDao extends ElasticsearchRepository<PostEsDTO, Long> {}/** * 帖子 ES 操作 * * */ public interface PostEsDao extends ElasticsearchRepository<PostEsDTO, Long> { }/** * 帖子 ES 操作 * * */ public interface PostEsDao extends ElasticsearchRepository<PostEsDTO, Long> { }
最后测试一下数据库的增删改。
总结
自此利用canal实现mysql的数据同步到elasticsearch就演示完成了,如果有更加复杂的同步逻辑,也可以在代码中自定义实现,并且第三方组件canal-spring-boot-starter
极大的简化了自定义canal客户端的难度。
遗憾的是canal-spring-boot-starter的作者目前已经停止了对其的维护,其最新版对应的canal实际是1.1.3
版本的,不过实测还不影响我们对接canal1.1.5。对canal客户端又更高性能的需求,可以研究源码,高度二开。