使用Kafka订阅PolarDB-X CDC
本小节介绍如何通过Canal订阅PolarDB-X增量数据,投递至Kafka进行消费。
演示环境说明
建议通过MacOS或者Linux机器来进行操作。
环境版本说明:
实例 | 版本说明 | 官方网站 |
---|---|---|
PolarDB-X | 2.0.1 | PolarDB-X |
Kafka | 2.13 | Kafka |
Canal | 1.1.5 | Canal |
准备PolarDB-X
如果您已经安装了Docker,请执行以下脚本完成单机版PolarDB-X的安装,该过程大概需要1-2分钟。
# 获取PolarDB-X镜像
docker pull polardbx/polardb-x:2.0.1
# 启动PolarDB-X, 并暴露8527端口, 这里可能需要1-2分钟
docker run -d --name polardbx-play -p 8527:8527 polardbx/polardb-x
# 通过MySQL客户端验证启动
mysql -h127.1 -P8527 -upolardbx_root -p"123456"
注意:PolarDB-X集群部署方式有PXD、Kubernetes等,详情请参见快速入门。
准备数据
mysql -h127.1 -P8527 -upolardbx_root -p"123456"
-- 创建测试库
create database canal;
use canal;
-- 创建测试表
create table `trades` (
id integer auto_increment NOT NULL,
shop_id integer comment '店铺id',
pay_amount decimal comment '支付金额',
stat_date date comment '统计时间',
primary key(id)
);
-- 写入数据
insert trades values(default, 1001, 10, '2022-03-15');
insert trades values(default, 1001, 10, '2022-03-15');
insert trades values(default, 1001, 10, '2022-03-15');
insert trades values(default, 1001, 10, '2022-03-15');
insert trades values(default, 1001, 10, '2022-03-15');
准备Kafka
下载最新的Kafka安装包并解压。启动ZooKeeper和Kafka,等待Kafka服务器启动完成。
# 解压
tar -xzf kafka_2.13-3.1.0.tgz
cd kafka_2.13-3.1.0
# 启动ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
# 启动Kafka
bin/kafka-server-start.sh config/server.properties
准备Canal
下载并安装Canal
下载最新的Canal安装包。
# 下载
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
# 解压
tar -xzf canal.deployer-1.1.5.tar.gz
修改配置文件
Canal的官方配置文件有很多配置项,其中需要改动配置如下:
- 编辑
vi conf/canal/canal.properties
,用于探测Canal实例配置:
canal.instance.tsdb.enable = false
canal.destinations = example
canal.conf.dir = ../conf
# 配置Kafka连接信息
kafka.bootstrap.servers = 127.0.0.1:9092
- 编辑
vi conf/example/instance.properties
,订阅PolarDB-X增量Binlog,并写入Kafka:
# 配置Canal源为PolarDB-X的连接信息,用于订阅PolarDB-X的Binlog
canal.instance.tsdb.enable=false
canal.instance.master.address=127.0.0.1:8527
canal.instance.dbUsername=polardbx
canal.instance.dbPassword=123456
# 定义写入Kafka的topic
canal.mq.topic=example
启动Canal
# 启动Canal
./bin/startup.sh
# 查看Canal日志
tail -f logs/canal/canal.log
# 查看Canal instance日志
tail -f logs/example/example.log
说明:如果收到如下图所示的警告信息,可以忽略,不影响正常运行。
消费Kafka Topic
Kafka Consumer消费Topic
# 订阅topic为”example“的消息
bin/kafka-console-consumer.sh --topic example --from-beginning --bootstrap-server localhost:9092
再次写入一些数据:
-- 登录PolarDB-X
mysql -h127.1 -P8527 -upolardbx_root -p"123456"
insert trades values(default, 1001, 10, '2022-03-15');
Kafka消息的数据结构示例
{
"data":[
{
"id":"100008",
"shop_id":"1001",
"pay_amount":"10",
"stat_date":"2022-03-15"
}
],
"database":"canal",
"es":1647950609000,
"id":4,
"isDdl":false,
"mysqlType":{
"id":"int(11)",
"shop_id":"int(11)",
"pay_amount":"decimal(10,0)",
"stat_date":"date"
},
"old":null,
"pkNames":[
"id"
],
"sql":"",
"sqlType":{
"id":4,
"shop_id":4,
"pay_amount":3,
"stat_date":91
},
"table":"trades",
"ts":1647950609988,
"type":"INSERT"
}