ZWZhangYu

V1

2022/08/02阅读:31主题:默认主题

alibaba数据同步组件canal的实践整理

@

认识

canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费 基于日志增量订阅和消费的业务包括 【1】数据库镜像 【2】数据库实时备份 【3】索引构建和实时维护(拆分异构索引、倒排索引等) 【4】业务 cache 刷新 【5】带业务逻辑的增量数据处理 当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

基本原理

MySQL 主备复制原理

在这里插入图片描述
在这里插入图片描述

【1】MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件 binary log events,可以通过 show binlog events 进行查看) 【2】MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log) 【3】MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal 工作原理

【1】canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议 【2】MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal ) 【3】canal 解析 binary log 对象(原始为 byte 流) 在这里插入图片描述 Canal 主要用途是对 MySQL 数据库增量日志进行解析,提供增量数据的订阅和消费,简单说就是可以对 MySQL 的增量数据进行实时同步,支持同步到 MySQL、Elasticsearch、HBase 等数据存储中去。

部署参考

具体的部署操作步骤参见: https://github.com/alibaba/canal/wiki/QuickStart 这边使用的是 Docker 进行容器化部署,从 DockerHub 上找到了 canal-server 的容器镜像,使用版本是 V1.1.5 版本。 canal/canal-server:v1.1.5 在这里插入图片描述 需要对外的端口是 11111,上面映射到主机的端口是 30000。 数据卷配置的方式使用的是主机映射卷,需要映射两个配置文件. 在这里插入图片描述 其中两个挂载对应 instance.properties 配置文件和 canal.properties 配置文件。

instance.properties 配置文件

instance.properties 配置文件 instance.properties 配置文件,该配置文件主要是配置监听的 MySQL 实例的相关属性。 在这里插入图片描述 position info 主要配置了连接的 MySQL 地址,binlog 日志名称,日志位置,时间戳。中间两个信息可以通过在 MySQL 中通过 show master status 命令查看 在这里插入图片描述 时间戳可以以当前时间为准,精确到毫秒级别。 在这里插入图片描述 canal.instance.XXX 指定了连接到对应数据库的账号密码等信息,该信息对应的就是官方文档的下面这一步操作。 在这里插入图片描述 canal.instance.filter.regex 表示表名称的过滤规则,即希望当前数据库实例那些表可以被监听到。这个可以在配置文件中配置,也可以在客户端代码中配置,这里还是要求在配置文件中构建,后面的连接方式没有使用客户端连接。 举例:全库全表:.\.. 指定某个库全表:test.._ ,匹配库名 test 下所有表 单表:test.user ;匹配库名 test 下 user 表 多规则组合使用: test.._,test2.user1,匹配 test 库下所有表以及匹配 test2 库下 usee1 表

修改客户端代码(Java)

修改 java 程序下 connector.subscribe 配置的过滤正则 全库全表 connector.subscribe(".\..") 指定库全表 connector.subscribe("test\..") 单表 connector.subscribe("test.user") 多规则组合使用 connector.subscribe("test\..,test2.user1,test3.user2")

canal.properties 配置文件

canal.properties 配置文件主要是描述 canal 服务的属性,开放的端口,连接的 MQ,admin 配置(该组件本次未使用) 配置消息队列 在刚开始是直接通过客户端连接到 Canal 服务器消费信息的,但是这样会存在一些问题,客户端因为重启或者其他原因不可用时会导致数据丢失,考虑到数据一致和完整性,增加了 RabbitMQ 作为消息传递中间件。 配置方式,修改 confg/canal.properties 添加 MQ 的配置,canal 支持 kafka、RocketMQ、RabbitMQ。因为项目中使用 RabbitMQ 比较多,也就选择了 RabbitMQ。

配置方式参考如下,需要提前创建 exchange、queue,并根据 MQ 的配置填写 host/username/password 信息。 在这里插入图片描述 配置完成后重启容器,并在数据库执行几条 DML 语句,观察 MQ 是否有消息进来。 在这里插入图片描述 可以在获取一条信息查看明细 在这里插入图片描述

相关产品对比

类似的监听数据库变更的组件有 MaxWell、canal 等,因为之前有使用过 canal 更熟悉一些所以也就选择了这个组件。 在这里插入图片描述

实践经验分享

Client 案例

canal 特别设计了 client-server 模式,交互协议使用 protobuf 3.0 , client 端可采用不同语言实现不同的消费逻辑

Java 版本的参考链接 https://github.com/alibaba/canal/wiki/ClientExample

 <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.5</version>
        </dependency>

下面提供一个简单的 SpringBoot 示例


import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;
import java.util.List;

@Component
public class CannalClient implements InitializingBean {

    private final static int BATCH_SIZE = 1000;

    @Override
    public void afterPropertiesSet() throws Exception {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.208.31"11111), "example""canal""canal");
        try {
            //打开连接
            connector.connect();
            //订阅数据库表,全部表
            connector.subscribe(".*\\..*");
            //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
            connector.rollback();
            while (true) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(BATCH_SIZE);
                //获取批量ID
                long batchId = message.getId();
                //获取批量的数量
                int size = message.getEntries().size();
                //如果没有数据
                if (batchId == -1 || size == 0) {
                    try {
                        //线程休眠2秒
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    //如果有数据,处理数据
                    printEntry(message.getEntries());
                }
                //进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
                connector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
    }

    /**
     * 打印canal server解析binlog获得的实体类信息
     */

    private static void printEntry(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                //开启/关闭事务的实体类型,跳过
                continue;
            }
            //RowChange对象,包含了一行数据变化的所有特征
            //比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
            CanalEntry.RowChange rowChage;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }
            //获取操作类型:insert/update/delete类型
            CanalEntry.EventType eventType = rowChage.getEventType();
            //打印Header信息
            System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));
            //判断是否是DDL语句
            if (rowChage.getIsDdl()) {
                System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());
            }
            //获取RowChange对象里的每一行数据,打印出来
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                //如果是删除语句
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                    //如果是新增语句
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                    //如果是更新的语句
                } else {
                    //变更前的数据
                    System.out.println("------->; before");
                    printColumn(rowData.getBeforeColumnsList());
                    //变更后的数据
                    System.out.println("------->; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}


消息队列消息示例

下面简单展示下监听到消息队列的消息内容,这里以一个测试表为例,展示下新增、删除、更新时接收到数据内容

CREATE TABLE `test` (
  `id` int(11) NOT NULL,
  `name` varchar(255) DEFAULT NULL,
  `phone` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE
=InnoDB DEFAULT CHARSET=utf8mb4;

新增消息

{
 "data": [{
  "id""1",
  "name""测试",
  "phone""12345678"
 }],
 "database""db",
 "es"1658225090000,
 "id"7326,
 "isDdl"false,
 "mysqlType": {
  "id""int",
  "name""varchar(255)",
  "phone""varchar(255)"
 },
 "old"null,
 "pkNames": ["id"],
 "sql""",
 "sqlType": {
  "id"4,
  "name"12,
  "phone"12
 },
 "table""test",
 "ts"1658225090766,
 "type""INSERT"
}

更新操作

{
 "data": [{
  "id""1",
  "name""测试更新",
  "phone""12345678"
 }],
 "database""db",
 "es"1658225185000,
 "id"7333,
 "isDdl"false,
 "mysqlType": {
  "id""int",
  "name""varchar(255)",
  "phone""varchar(255)"
 },
 "old": [{
  "name""测试"
 }],
 "pkNames": ["id"],
 "sql""",
 "sqlType": {
  "id"4,
  "name"12,
  "phone"12
 },
 "table""test",
 "ts"1658225185768,
 "type""UPDATE"
}

删除操作

{
 "data": [{
  "id""1",
  "name""测试更新",
  "phone""12345678"
 }],
 "database""db",
 "es"1658225230000,
 "id"7337,
 "isDdl"false,
 "mysqlType": {
  "id""int",
  "name""varchar(255)",
  "phone""varchar(255)"
 },
 "old"null,
 "pkNames": ["id"],
 "sql""",
 "sqlType": {
  "id"4,
  "name"12,
  "phone"12
 },
 "table""test",
 "ts"1658225230416,
 "type""DELETE"
}

批量插入 10000 条数据检查数据完整性

考虑到数据的完整性,当前测试快速插入一万条数据进行测试,看最终的数据监听情况。

CREATE DEFINER=`root`@`%` PROCEDURE `batchInsertUserTest`()
BEGIN

 declare i int;
    set i=1;
    while i<=10000 do
INSERT INTO  -- INSERT测试语句
set i=i+1;
    end while;
END

执行存储过程 call batchInsertUserTest(); 测试结果,经过测试观察可以看到数据处理正常,当前的表是 canal 监听到数据并写入的业务数据表 在这里插入图片描述

修改关联表结构测试

ALTER TABLE  `tableName`
ADD COLUMN `temp` varchar(255) NULL AFTER `status`;
在这里插入图片描述
在这里插入图片描述

经过测试发现当前版本是可以监听到 DDL 的变更的,注意这个问题在老版本中是存在一些问题的,可能出现本地缓存的字段列数和当前列数不一致的情况,在创建一个新表的时候,观察日志是可以看到他会将表结构放到本地缓存中的。

数据压缩和表结构设计

如果使用 canal 记录监听信息,需要注意 MySQL 变更日志数据量是很大的,要根据具体的情况进行筛选和丢弃。在刚开始时,我并没有注意到这个问题,导致几天时间内就积攒了几百万的数据,而且其中还有一些 json 的大文本字段,结果就导致数据备份程序因为空间不够出错。 1:可以适当的进行筛选和表的过滤,只记录需要的数据 2:如果表只做查询操作,可以适当的对一些大文本字段进行压缩,或者对表空间进行压缩,减少占用空间

事务回滚是否会还会监听到数据

考虑到业务侧存在因异常事务回滚的情况,所以进行了下面的手动事务开启和回滚的操作,经过测试事务回滚并不会触发。

START TRANSACTION;
INSERT INTO ...
ROLLBACK ;

通过事物添加记录,如果出现异常回滚,MYSQL binlog 不会记录删除记录。

单个 INSERT 多个数据生成的消息说明

canal1.1.5 mysql 批量更新或批量插入,canal 生成的消息只有一条 insert into exam.canal_test(name) values ('232323232'),('ttttttt'),('ooooooo');

针对上面的情况,需要在写代码的时候注意使用集合进行处理,避免出现问题。

低概率数据丢失问题

在浏览 GitHub 的 issues 时发现有部分用户反映 canal 有概率丢失数据,这个我在实际的使用中未发现,但是通过下面描述可能是存在低概率的数据丢失问题,这个在使用过程中需要注意,对于核心的业务处理需要做好补偿操作。 在这里插入图片描述

分类:

后端

标签:

后端

作者介绍

ZWZhangYu
V1