首页 > 基础资料 博客日记

java整合Canal实现数据库监听(附完整的踩坑总结)

2024-09-12 22:00:07基础资料围观104

文章java整合Canal实现数据库监听(附完整的踩坑总结)分享给大家,欢迎收藏Java资料网,专注分享技术知识

1. 准备工作

1.1. MySQL开启binlog日志

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

修改完成后输入: show variables like 'log_bin';查看binlog是否生效

show variables like 'log_bin';

1.2. 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限

-- 创建一个用户名密码均为canal的账户
CREATE USER canal IDENTIFIED BY 'canal';  
-- 授予用户'canal'在所有数据库和表上进行SELECT查询以及复制从库和复制客户端的权限,并且允许该用户从任何主机连接到数据库服务器。
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- 刷新权限
FLUSH PRIVILEGES; 

2. 正式开始

2.1. 采用doker部署canal-server

2.1.1. 使用docker拉取最新版canal服务端
docker pull canal/canal-server:latest
2.1.2. 使用docker run命令创建容器并挂载配置文件所在的数据卷

# -v 本地的instance.properties:容器的instance.properties将容器的instance.properties配置文件挂载到宿主机,方便后续变更
docker run \
--name mycanal \
-p 11111:11111 \
-v /tmp/canal/conf2/instance.properties:/home/admin/canal-server/conf/example/instance.properties \
 -d \
 canal/canal-server

2.2. 修改配置文件

进入数据卷挂载的目录新建一个instance.properties文件 修改配置如下

## mysql serverId 不能和mysql相同
canal.instance.mysql.slaveId = 1234  
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 
#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 
#username/password,需要改成自己的数据库信息 此处采用刚刚创建的用户
canal.instance.dbUsername = canal   
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#需要改成同步的数据库表规则
canal.instance.filter.regex = .\*\\\\..\*

配置参考 此处引用csdn博客

mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\)
常见例子:

  1. 所有表:.   or  .\..*
  2. canal schema下所有表: canal\..*
  3. canal下的以canal打头的表:canal\.canal.*
  4. canal schema下的一张表:canal.test1
  5. 多个规则组合使用:canal\..*,mysql.test1,mysql.test2 (逗号分隔)
    注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)

修改完成后重启 canal容器即可。

docker restart  mycanal;

至此,canal服务端就算配置完成

3. java示例

3.1. 导入相关maven依赖

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.0</version>
</dependency>

3.2. 创建官方示例代码


import java.net.InetSocketAddress;
import java.util.List;


import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;


public class SimpleCanalClientExample {


public static void main(String args[]) {
    // 创建链接
    CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                                                                                        11111), "example", "", "");
    int batchSize = 1000;
    int emptyCount = 0;
    try {
        connector.connect();
        connector.subscribe(".*\\..*");
        connector.rollback();
        int totalEmptyCount = 120;
        while (emptyCount < totalEmptyCount) {
            Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
            long batchId = message.getId();
            int size = message.getEntries().size();
            if (batchId == -1 || size == 0) {
                emptyCount++;
                System.out.println("empty count : " + emptyCount);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
            } else {
                emptyCount = 0;
                // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                printEntry(message.getEntries());
            }

            connector.ack(batchId); // 提交确认
            // connector.rollback(batchId); // 处理失败, 回滚数据
        }

        System.out.println("empty too many times, exit");
    } finally {
        connector.disconnect();
    }
}

private static void printEntry(List<Entry> entrys) {
    for (Entry entry : entrys) {
        if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
            continue;
        }

        RowChange rowChage = null;
        try {
            rowChage = RowChange.parseFrom(entry.getStoreValue());
        } catch (Exception e) {
            throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                                       e);
        }

        EventType eventType = rowChage.getEventType();
        System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                                         entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                                         entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                                         eventType));

        for (RowData rowData : rowChage.getRowDatasList()) {
            if (eventType == EventType.DELETE) {
                printColumn(rowData.getBeforeColumnsList());
            } else if (eventType == EventType.INSERT) {
                printColumn(rowData.getAfterColumnsList());
            } else {
                System.out.println("-------&gt; before");
                printColumn(rowData.getBeforeColumnsList());
                System.out.println("-------&gt; after");
                printColumn(rowData.getAfterColumnsList());
            }
        }
    }
}

private static void printColumn(List<Column> columns) {
    for (Column column : columns) {
        System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
    }
}

    
  

}

3.3. 运行Client

首先启动Canal Server

启动Canal Client后,可以从控制台从看到类似消息:

empty count : 1
empty count : 2
empty count : 3
empty count : 4

此时代表当前数据库无变更数据

改改数据库,如修改或添加数据时,会出现如下提示:

================&gt; binlog[binlog.000002:7402] , name[mydb,asd] , eventType : UPDATE
-------&gt; before
123 : 123    update=false
-------&gt; after
123 : 1233    update=true

出现如上消息代表数据库被改变 至此,恭喜你成功配置.

4. 踩坑总结

4.1. java客户端无法连接到canal的服务端

如下图所示:

需要检查下地址的配置,确认能ping通

如果可以ping通但是仍然无法连接,查看是否是虚拟机的防火墙是否打开,11111端口可能被拦截。

4.2. java客户端连接到了canal但是修改数据库无日志打印

4.2.1. 原因1: canal无法连接到MySQL
  1. 首先检查数据库中是否有配置文件配置的账户和密码,如本文中的 账号:canal 密码:canal
  2. 如果是MySQL 8.0以上,需要修改身份验证插件
## 自MySQL 8.0.3开始,身份验证插件默认使用caching_sha2_password
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'password';
  1. 如果是在docker中部署的MySQL,需要注意此时的配置文件 canal.instance.master.address = 127.0.0.1:3306 的地址需要是docker的地址,通过 ifconfig查看

4.2.2. 原因2: MySQL的binlog日志未开启,此处参考第一步开启
4.2.3. 原因3: MySQL的binlog日志的binlog-format非ROW模式,修改即可

4.3. 其他原因,手动查看canal的日志,排查问题

输入 docker ps 查看正在运行的容器

[root@localhost ~]# docker ps
CONTAINER ID   IMAGE                COMMAND                   CREATED          STATUS          PORTS                                                                           NAMES
b4b3328b1a08   canal/canal-server   "/alidata/bin/main.s…"   55 minutes ago   Up 50 minutes   9100/tcp, 11110/tcp, 11112/tcp, 0.0.0.0:11111->11111/tcp, :::11111->11111/tcp   mycanal2
c34cdcfea74e   mysql                "docker-entrypoint.s…"   2 hours ago      Up 2 hours      0.0.0.0:3306->3306/tcp, :::3306->3306/tcp, 33060/tcp                            mymysql
460bf1f5daaf   canal/canal-server   "/alidata/bin/main.s…"   4 hours ago      Up 4 hours      9100/tcp, 11110-11112/tcp                                                       zealous_hermann

找到canal的容器id 输入 docker exec -it [容器id] /bin/bash 进入容器内部

进入 /canal-server/logs/example目录输入 cat example.log 查看日志根据具体内容排查

例如:

参考文档:

java整合canal 实现数据同步_java 融合canal-client-CSDN博客

canal官方wiki


文章来源:https://blog.csdn.net/2301_77516476/article/details/138012688
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!

标签:

相关文章

本站推荐

标签云