canal同步mysql数据到es中

cannl同步mysql数据到es中

canal组件介绍

canal-admin
(非必须但推荐使用):为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作。

canal-server:
服务端,从mysql读取binlog日志获取增量日志,可以通过tcp、kafka、RocketMQ等方式与客户端通信;通过zookeeper搭建集群。

canal-adapter
客户端,根据canal-server获取的增量日志执行适配到其他诸如elasticsearch、redis、mysql等端,实现数据同步。


本次实验环境

首先我们需要下载canal的各个组件canal-servercanal-adaptercanal-admin,下载地址https://github.com/alibaba/canal/releases

es及相关工具的部署可参考Elasticsearch详解及部署

1、开启mysql的binlog

使用canal-server需要先准备mysql,对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式

vi /etc/my.cnf

[mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复 

配置完成后重启mysql,并查询是否配置生效:ON就是开启

systemctl restart mysqld

可以进入数据库再次查看下

mysql> show variables like 'log_bin'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | log_bin | ON | +---------------+-------+ 1 row in set (0.01 sec)  mysql> show variables like 'binlog_format%'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | binlog_format | ROW | +---------------+-------+ 1 row in set (0.00 sec)

2、mysql用户数据准备

 创建一个canal用户并对其进行授权

CREATE USER canal IDENTIFIED BY 'Cjz123456.'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES; 

创建一个测试用户数据库及表

create database canal;
USE canal; CREATE TABLE product ( id bigint(20) NOT NULL AUTO_INCREMENT, title varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, sub_title varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, price decimal(10, 2) NULL DEFAULT NULL, pic varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, PRIMARY KEY (id) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

3、创建存放canal的目录  

mkdir /usr/local/canal-server

mkdir /usr/local/canal-adpter  

4、部署canal-server

tar -zxvf canal.deployer-1.1.6-SNAPSHOT.tar.gz -C /usr/local/canal-server

vim /usr/local/canal-server/conf/example/instance.properties

# 需要同步数据的MySQL地址 canal.instance.master.address=127.0.0.1:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= # 用于同步数据的数据库账号 canal.instance.dbUsername=canal # 用于同步数据的数据库密码 canal.instance.dbPassword=Cjz123456. # 数据库连接编码 canal.instance.connectionCharset = UTF-8 # 需要订阅binlog的表过滤正则表达式 canal.instance.filter.regex=.*\\..*

cd /usr/local/canal-server/bin

./startup.sh

查看日志查看是否正常启动

cd /usr/local/canal-server/logs/

tail -f example/example.log

tail -f canal/canal.log

5、部署canal-adapter

tar -zxvf canal.adapter-1.1.6-SNAPSHOT.tar.gz -C /usr/local/canal-adpter/

cd /usr/local/canal-adpter/conf

vim application.yml

server:   port: 8081 spring:   jackson:     date-format: yyyy-MM-dd HH:mm:ss     time-zone: GMT+8     default-property-inclusion: non_null  canal.conf:   mode: tcp # 客户端的模式,可选tcp kafka rocketMQ   flatMessage: true # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效   zookeeperHosts:    # 对应集群模式下的zk地址   syncBatchSize: 1000 # 每次同步的批数量   retries: 0 # 重试次数, -1为无限重试   timeout: # 同步超时时间, 单位毫秒   accessKey:   secretKey:   consumerProperties:     canal.tcp.server.host: 127.0.0.1:11111 #设置canal-server的地址     canal.tcp.zookeeper.hosts:     canal.tcp.batch.size: 500     canal.tcp.username:     canal.tcp.password:   srcDataSources:     defaultDS:       url: jdbc:mysql://192.168.111.129:3306/canal?useUnicode=true&charaterEncoding=utf-8&useSSL=false       username: canal       password: Cjz123456.   canalAdapters:   - instance: example # canal instance Name or mq topic name     groups:     - groupId: g1       outerAdapters:       - name: logger       - name: es7         hosts: http://192.168.111.129:9200 # 127.0.0.1:9200 for rest mode         properties:           mode: rest # or rest           security.auth: elastic:elastic #  only used for rest mode           cluster.name: my-es 

修改 canal-adapter/conf/es7/mytest_user.yml 文件,用于配置MySQL中的表与Elasticsearch中索引的映射关系

vim es7/mytest_user.yml

dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值 destination: example  # canal的instance或者MQ的topic groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据 esMapping:   _index: canal_product # es 的索引名称   _id: _id  # 将Mysql表里的id对应上es上的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配   sql: SELECT          p.id AS _id,          p.title,          p.sub_title,          p.price,          p.pic         FROM          product p        # sql映射   etlCondition: where p.id>={}   #etl的条件参数   commitBatch: 3000   # 提交批大小 

cd /usr/local/canal-adpter/bin/

./startup.sh

查看日志看是否正常启动 

cd /usr/local/canal-adpter/logs/adapter/

tail -f adapter.log

6、创建ES索引

可以直接通过Kibana页面进行创建,不难看出,索引的属性对应着就是我们表的属性(Mysql表id与es的_id做了对应,所以这里不需要设置)

PUT canal_product {   mappings: {     properties: {       title: {         type: text       },       sub_title: {         type: text       },       pic: {         type: text       },       price: {         type: double       }     }   } } 

7、验证

canal初始化是不会全量同步数据的,所以我们需要登录mysql,插入一条数据

INSERT INTO product ( id, title, sub_title, price, pic ) VALUES ( 15, '小米8', ' 全面屏游戏智能手机 6GB+64GB', 1999.00, NULL );

创建完成后在Kibana上查看即可发现数据已经同步

GET canal_product/_search

在elasticsearch-head上查看也是,可以看出,我们的表id变成了es的_id

在Mysql上修改数据看看

UPDATE product SET title='小米10' WHERE id=15;

 可以看到我们修改的数据也进行了同步

 


该文章参考于:https://blog.csdn.net/zh1998wx/article/details/123101442