名称:canal [kə'næl]
译意: 水道/管道/沟渠
语言: 纯java开发
定位: 基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql
关键词: mysql binlog parser / real-time / queue&topic
本文中使用版本为 1.1.3,下载地址:https://github.com/alibaba/canal/releases,下载后解压目录如下:
1、将 serverMode 改为 kafka
# tcp, kafka, RocketMQ
canal.serverMode = kafka
2、配置 kafka server地址
canal.mq.servers = localhost:9092
这里需要注意配置文件中的 destinations 配置项:
canal.destinations = example
canal.conf.dir = ../conf
canal 监听的 mysql 示例在这里配置,上面的配置表示在安装目录的 conf 文件夹下,有一个 example 文件夹,代表有一个 example 实例,如有多个文件夹这里配置时需要用逗号分隔,该文件夹下有 instance.properties 配置文件用来配置需要监听的 mysql binlog。方便起见,本文中的所有 canal.instance.tsdb.enable 配置都为 false。
在开始之前,有一些准备工作需要做:
1、mysql 需开启 binlog,建议 Row 模式;
[root@canal /]# vi etc/my.cnf
[mysqld]
log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
2、canal 所使用的账户需要有 slave 权限
# 创建用户
mysql> CREATE USER 'canal'@'localhost' IDENTIFIED BY 'canal';
# 授权
mysql> GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
# 查看用户权限
mysql> show grants canal
3、查看master status
mysql> show master status;
+------------------+----------+--------------+------------------+-------------------+
| File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+------------------+-------------------+
| mysql-bin.000021 | 4241 | | | |
+------------------+----------+--------------+------------------+-------------------+
1 row in set (0.00 sec)
上面参考:https://segmentfault.com/a/1190000012862191
我们可以看到File:mysql-bin.000021 Position:4241 记住这两个值
配置instance.properties:
1、配置 mysql 信息以及 position
canal.instance.master.address=localhost:3306
canal.instance.master.journal.name=mysql-bin.000021
canal.instance.master.position=4241
2、配置用户名密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 默认连接数据库,此项配置是无效的
canal.instance.defaultDatabaseName=test
canal.instance.connectionCharset=UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
3、配置需要监听的库和表
# table regex
canal.instance.filter.regex=test\..*
# table black regex
canal.instance.filter.black.regex=
默认监听所有库和表,上面配置表示监听 test 库下的所有表。
4、配置 kafka 接收数据 topic
canal.mq.topic=test
在数据库表中的数据有变化后,kafka 中相应的 topic 中会有对应的消息,示例数据如下
insert
{"data":[{"id":"1","name":"test"}],"database":"test","es":1559034949000,"id":4,"isDdl":false,"mysqlType":{"id":"int(10) unsigned","label_name":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"label","ts":1559034950327,"type":"INSERT"}
update
{"data":[{"id":"1","name":"test2"}],"database":"test","es":1559035012000,"id":9,"isDdl":false,"mysqlType":{"id":"int(10) unsigned","name":"varchar(255)"},"old":[{"label_name":"test"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"label","ts":1559035012345,"type":"UPDATE"}
end.
参考:
https://github.com/alibaba/canal/wiki/QuickStart
https://segmentfault.com/a/1190000012862191