简介

官方 wiki

名称:canal [kə'næl]

译意: 水道/管道/沟渠

语言: 纯java开发

定位: 基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql

关键词: mysql binlog parser / real-time / queue&topic

工作原理


配置方式

本文中使用版本为 1.1.3,下载地址:https://github.com/alibaba/canal/releases,下载后解压目录如下:


配置文件修改

canal.properties

1、将 serverMode 改为 kafka

# tcp, kafka, RocketMQ
canal.serverMode = kafka

2、配置 kafka server地址

canal.mq.servers = localhost:9092

这里需要注意配置文件中的 destinations 配置项:

canal.destinations = example
canal.conf.dir = ../conf

canal 监听的 mysql 示例在这里配置,上面的配置表示在安装目录的 conf 文件夹下,有一个 example 文件夹,代表有一个 example 实例,如有多个文件夹这里配置时需要用逗号分隔,该文件夹下有 instance.properties 配置文件用来配置需要监听的 mysql binlog。方便起见,本文中的所有 canal.instance.tsdb.enable 配置都为 false。

instance.properties

在开始之前,有一些准备工作需要做:

1、mysql 需开启 binlog,建议 Row 模式;

[root@canal /]# vi etc/my.cnf

[mysqld]
log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复

2、canal 所使用的账户需要有 slave 权限

# 创建用户
mysql> CREATE USER 'canal'@'localhost' IDENTIFIED BY 'canal';
# 授权
mysql> GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
# 查看用户权限
mysql> show grants canal

3、查看master status

mysql> show master status;
+------------------+----------+--------------+------------------+-------------------+
| File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+------------------+-------------------+
| mysql-bin.000021 |     4241 |              |                  |                   |
+------------------+----------+--------------+------------------+-------------------+
1 row in set (0.00 sec)

上面参考:https://segmentfault.com/a/1190000012862191

我们可以看到File:mysql-bin.000021 Position:4241 记住这两个值
配置instance.properties:

1、配置 mysql 信息以及 position

canal.instance.master.address=localhost:3306
canal.instance.master.journal.name=mysql-bin.000021
canal.instance.master.position=4241

2、配置用户名密码

canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 默认连接数据库,此项配置是无效的
canal.instance.defaultDatabaseName=test
canal.instance.connectionCharset=UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false

3、配置需要监听的库和表

# table regex
canal.instance.filter.regex=test\..*
# table black regex
canal.instance.filter.black.regex=

默认监听所有库和表,上面配置表示监听 test 库下的所有表。

4、配置 kafka 接收数据 topic

canal.mq.topic=test

kafka 数据示例

在数据库表中的数据有变化后,kafka 中相应的 topic 中会有对应的消息,示例数据如下

insert

{"data":[{"id":"1","name":"test"}],"database":"test","es":1559034949000,"id":4,"isDdl":false,"mysqlType":{"id":"int(10) unsigned","label_name":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"label","ts":1559034950327,"type":"INSERT"}

update

{"data":[{"id":"1","name":"test2"}],"database":"test","es":1559035012000,"id":9,"isDdl":false,"mysqlType":{"id":"int(10) unsigned","name":"varchar(255)"},"old":[{"label_name":"test"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"label","ts":1559035012345,"type":"UPDATE"}


end.

参考:

https://github.com/alibaba/canal/wiki/QuickStart

https://segmentfault.com/a/1190000012862191