cannal基本使用

前提

安装完MySQL(我安装的是5.7),安装JDK(canal依赖)

开启MySQL的binlog

开启binlog,并且将binlog的格式改为Row,这样就可以获取到CURD的二进制内容。配置/etc/my.cnf,在[mysqld]增加

1
2
3
log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式
server_id=1 # 唯一,不能和其他集群MySQL的server_id一样

验证binlog是否开启

登录MySQL,使用命令:

1
show variables like 'log_%';

若 log_bin显示为 on ,则说明开启。

给canal分配MySQL的账号

给canal分配一个MySQL的账号,方便canal偷取MySQL的binlog。

1
2
3
CREATE USER canal IDENTIFIED BY 'canal';
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

查看是否给canal账号分配权限

1
show grants for 'canal'

下载解压canal

地址:https://github.com/alibaba/canal/releases ,目前稳定版是 v1.1.0,下载 canal.deployer-1.1.0.tar.gz。解压到 canal目录下(没有该目录 就新建)

注:canal 是纯Java写的,所有需要依赖JDK环境,我这边使用的是:1.8.0_65-b17

1
2
3
4
5
6
# 下载
wget https://github.com/alibaba/canal/releases/download/canal-1.1.0/canal.deployer-1.1.0.tar.gz
# 创建canal目录
mkdir canal
# 解压
tar -zxvf https://github.com/alibaba/canal/releases/download/canal-1.1.0/canal.deployer-1.1.0.tar.gz

canal和instance配置文件

一个canal里面可能会有多个instance,也就说一个instance可以监控一个mysql实例,多个instance也就可以对应多台服务器的mysql实例。也就是一个canal就可以监控分库分表下的多机器MySQL。

(1)canal.properties

canal/config 中的canal.properties文件,是全局性的canal服务器配置 ,修改内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#################################################
######### common argument #############
#################################################
# id唯一,不可与mysql的server_id重复
canal.id= 2
canal.ip=
canal.port=11111
canal.metrics.pull.port=11112
canal.zkServers=
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size = 1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
# parallelThreadSize默认是注释掉的,原值为16,因为canal装在本地VM上,分配了1个CPU,导致报错,改为1
canal.instance.parser.parallelThreadSize = 1
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable=true
canal.instance.tsdb.dir=${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url=jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername=canal
canal.instance.tsdb.dbPassword=canal

# rds oss binlog account
canal.instance.rds.accesskey =
canal.instance.rds.secretkey =

#################################################
######### destinations #############
#################################################
canal.destinations= example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5

canal.instance.tsdb.spring.xml=classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml=classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = spring
canal.instance.global.lazy = false
#canal.instance.global.manager.address = 127.0.0.1:1099
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

# position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =

# username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName = test
canal.instance.connectionCharset = UTF-8

# table regex
canal.instance.filter.regex = .*\\..*
(2)instance.properties

位于 canal/example/instance.properties,是具体的某个instances实例的配置,未涉及到的配置都会从canal.properties上继承,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info address修改为自己的mysql地址
canal.instance.master.address=192.168.204.128:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password 修改为在mysql中给canal同步数据的账号 密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 监听的数据库
canal.instance.defaultDatabaseName=test
canal.instance.connectionCharset=UTF-8

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=
#################################################

创建test数据库

查看MySQL上是否有test数据库,没有则创建

开启canal

进入canal/bin,执行:./startup.sh。

使用 ps -ef|grep canal 验证是否开启。

Java client代码

创建SpringBoot工程,引入依赖:

1
2
3
4
5
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>

创建TestCanal类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package com.xbq.canal.test;

import java.awt.Event;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.Header;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;

/**
* @Auther: xbq
* @Date: 2018/9/11 19:16
* @Description:
*/
public class TestCanal {

public static void main(String[] args) throws InterruptedException {
// 第一步:与canal进行连接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.204.128", 11111),
"example", "", "");
connector.connect();
// 第二步:开启订阅
connector.subscribe();
// 第三步:循环订阅
while (true) {
try {
// 每次读取 1000 条
Message message = connector.getWithoutAck(1000);
long batchID = message.getId();
int size = message.getEntries().size();
if (batchID == -1 || size == 0) {
System.out.println("当前暂时没有数据");
Thread.sleep(1000);
} else {
System.out.println("-------------------------- 有数据啦 -----------------------");
PrintEntry(message.getEntries());
}
// position id ack (方便处理下一条)
connector.ack(batchID);
} catch (Exception e) {
// TODO: handle exception
} finally {
Thread.sleep(1000);
}
}
}

/**
* 获取每条打印的记录
* @param entrys
*/
public static void PrintEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
// 第一步:拆解entry 实体
Header header = entry.getHeader();
EntryType entryType = entry.getEntryType();

// 第二步: 如果当前是RowData,那就是我需要的数据
if (entryType == EntryType.ROWDATA) {
String tableName = header.getTableName();
String schemaName = header.getSchemaName();
RowChange rowChange = null;
try {
rowChange = RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
EventType eventType = rowChange.getEventType();
System.out.println(String.format("当前正在操作 %s.%s, Action= %s", schemaName, tableName, eventType));

// 如果是‘查询’ 或者 是 ‘DDL’ 操作,那么sql直接打出来
if (eventType == EventType.QUERY || rowChange.getIsDdl()) {
System.out.println("rowchange sql ----->" + rowChange.getSql());
return;
}
// 第三步:追踪到 columns 级别
rowChange.getRowDatasList().forEach((rowData) -> {
// 获取更新之前的column情况
List<Column> beforeColumns = rowData.getBeforeColumnsList();
// 获取更新之后的 column 情况
List<Column> afterColumns = rowData.getAfterColumnsList();
// 当前执行的是 删除操作
if (eventType == EventType.DELETE) {
PrintColumn(beforeColumns);
}
// 当前执行的是 插入操作
if (eventType == EventType.INSERT) {
PrintColumn(afterColumns);
}
// 当前执行的是 更新操作
if (eventType == EventType.UPDATE) {
PrintColumn(afterColumns);
}
});
}
}
}

/**
* 每个row上面的每一个column 的更改情况
* @param columns
*/
public static void PrintColumn(List<Column> columns) {
columns.forEach((column) -> {
String columnName = column.getName();
String columnValue = column.getValue();
String columnType = column.getMysqlType();
// 判断 该字段是否更新
boolean isUpdated = column.getUpdated();
System.out.println(String.format("columnName=%s, columnValue=%s, columnType=%s, isUpdated=%s", columnName,
columnValue, columnType, isUpdated));
});
}
}

运行此类。在MySQL test数据库中创建student表,对其进行增删改,可以发现控制台上打印:有数据库啦……

参考

[缓存一致性和跨服务器查询的数据异构解决方案canal

欢迎关注我的公众号~ 搜索公众号: 翻身码农把歌唱 或者 扫描下方二维码:

img

坚持原创技术分享,您的支持将鼓励我继续创作!
-------------本文结束感谢您的阅读-------------
分享到:
0%