CDC 监听 Binlog

Siona

Debezium 监听 MySQL Binlog

一、pom.xml 导入依赖


<properties>
    <debezium.version>2.3.4.Final</debezium.version>
</properties>

<dependencies>
<!--    监听binlog    -->
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>${debezium.version}</version>
</dependency>

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>${debezium.version}</version>
</dependency>

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mysql</artifactId>
    <version>${debezium.version}</version>
</dependency>
</dependencies>

2. Debezium 配置文件

application.yml

# Debezium CDC 监听 binlog
debezium:
  connector:
    name: my-connector-001
    class: io.debezium.connector.mysql.MySqlConnector
    tasks.max: 1

  database:
    hostname: 127.0.0.1
    port: 3306
    user: root
    password: 123456
    server:
      id: 1234
      name: my-server-1
    include:
      list: "dev_ahzilai"

  table:
    include:
      list: "dev_ahzilai.scm_contract_contract, dev_ahzilai.scm_contract_template"
  schemas:
    enable: false
  include:
    schema:
      changes: false
  schema:
    history:
      class: io.debezium.storage.file.history.FileSchemaHistory
      file: tmp/history/db-history.dat
  offset:
    storage:
      file: tmp/offsets/offset.dat
    flush:
      interval: 60000
  topic:
    prefix: "my-app-connector"

DebeziumProperties.java 配置类

package com.siona.dao.config;

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * Debezium 配置类
 * @Description TODO
 * @Version 1.0.0
 * @Date 2023/12/18 16:02
 * @Created by Siona
 */
@Data
@Component
public class DebeziumProperties {

    @Value("${debezium.database.hostname}")
    private String hostname;

    @Value("${debezium.database.port}")
    private String port;

    @Value("${debezium.database.user}")
    private String user;

    @Value("${debezium.database.password}")
    private String password;

    @Value("${debezium.database.server.id}")
    private String serverId;

    @Value("${debezium.database.server.name}")
    private String serverName;

    @Value("${debezium.connector.name}")
    private String connectorName;

    @Value("${debezium.connector.class}")
    private String connectorClass;

    @Value("${debezium.offset.storage.file}")
    private String offsetStorageFileName;

    @Value("${debezium.offset.flush.interval}")
    private String offsetFlushInterval;

    @Value("${debezium.database.include.list}")
    private String databaseIncludeList;

    @Value("${debezium.table.include.list}")
    private String tableIncludeList;

    @Value("${debezium.schemas.enable}")
    private String schemasEnable;

    @Value("${debezium.include.schema.changes}")
    private String includeSchemaChanges;

    @Value("${debezium.schema.history.class}")
    private String schemaHistoryClass;

    @Value("${debezium.schema.history.file}")
    private String schemaHistoryFile;

    @Value("${debezium.offset.flush.interval}")
    private Long flushInterval;

    @Value("${debezium.topic.prefix}")
    private String topicPrefix;
}

3. Debezium 监听 MySQL 配置类

后期考虑拆分 connect 为 start 等方法

MyBinlogConfig.java

package com.siona.dao.config;

import com.qx.dao.listener.BinlogUpdateEvent;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @Description TODO
 * @Version 1.0.0
 * @Date 2023/12/18 16:00
 * @Created by Siona
 */
@Configuration
@Slf4j
@Data
public class MySQLBinlogConfig {


    @Autowired
    private DebeziumProperties debeziumProperties;

    @Resource
    ApplicationContext applicationContext;

    DebeziumEngine<ChangeEvent<String, String>> engine;

    @Bean
    public void debeziumConfig() {
        // Define the configuration for the embedded and MySQL connector ...
        io.debezium.config.Configuration config = io.debezium.config.Configuration.create()
                /* begin engine properties */
                .with("connector.class", debeziumProperties.getConnectorClass())
                .with("offset.storage", FileOffsetBackingStore.class.getCanonicalName())
                .with("offset.storage.file.filename", debeziumProperties.getOffsetStorageFileName())
                .with("offset.flush.interval.ms", debeziumProperties.getFlushInterval())
                .with("schemas.enable", debeziumProperties.getSchemasEnable())
                .with("include.schema.changes", debeziumProperties.getIncludeSchemaChanges())
                /* begin connector properties */
                .with("name", debeziumProperties.getConnectorName())
                .with("database.hostname", debeziumProperties.getHostname())
                .with("database.port", debeziumProperties.getPort())
                .with("database.user", debeziumProperties.getUser())
                .with("database.password", debeziumProperties.getPassword())
                .with("database.include.list", debeziumProperties.getDatabaseIncludeList())
                .with("table.include.list", debeziumProperties.getTableIncludeList())
                .with("database.server.id", debeziumProperties.getServerId())
                .with("database.server.name", debeziumProperties.getServerName())
                .with("topic.prefix", debeziumProperties.getTopicPrefix())
                .with("schema.history.internal", debeziumProperties.getSchemaHistoryClass())
                .with("schema.history.internal.file.filename", debeziumProperties.getSchemaHistoryFile())
                .build();

        try {
            // 2. 构建 DebeziumEngine
            // 使用 Json 格式
            engine = DebeziumEngine.create(Json.class)
                    .using(config.asProperties())
                    .notifying(record -> {
                        applicationContext.publishEvent(new BinlogUpdateEvent(record.value()));
                    })
                    .using((success, message, error) -> {
                        if (!success && error != null) {
                            log.error("Debezium 报错 Message:{}", message);
                            log.error("Debezium 报错 Error:{}", error.getMessage());
                            log.error("Debezium 重连中...");
                            connect(this.engine);
                        }
                    })
                    .build();
            connect(engine);
        } catch (Exception e) {
            log.error("Debezium 启动失败:{}", e.getMessage());
            connect(this.engine);
        }
    }


    public void connect(DebeziumEngine<ChangeEvent<String, String>> engine){
        // Run the engine asynchronously ...
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.execute(engine);
        log.info("Debezium 已启动...");
    }

}
package com.siona.dao.listener;

import org.springframework.context.ApplicationEvent;

/**
 * @Description TODO
 * @Version 1.0.0
 * @Date 2023/11/05 17:38
 * @Created by Siona
 */
public class BinlogUpdateEvent extends ApplicationEvent {

    public BinlogUpdateEvent(String source) {
        super(source);
    }
}
package com.siona.dao.listener;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

/**
 * 同步事件监听
 *
 * @Description TODO
 * @Version 1.0.0
 * @Date 2023/11/05 17:37
 * @Created by Siona
 */
@Component
@Slf4j
public class BinlogUpdateListener implements ApplicationListener<ApplicationEvent> {


    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        if (event instanceof BinlogUpdateEvent) {
            // binlog 更新事件
            BinlogUpdateEvent binlogUpdateEvent = (BinlogUpdateEvent) event;
            String binlogJson = (String) binlogUpdateEvent.getSource();
            binlogUpdate(binlogJson);
        }
    }

    /**
     * 处理 binlog 更新
     *
     * @param binlogJson json 字符串
     */
    private void binlogUpdate(String binlogJson) {
        try {
            JSONObject binlogJsonObj = JSON.parseObject(binlogJson);
            JSONObject payload = binlogJsonObj.getJSONObject("payload");

            // 只监听 c,u,d
            if (!"r".equals(payload.get("op"))) {
                log.warn("binlogJsonObj: {}", binlogJsonObj);
                JSONObject after = payload.getJSONObject("after");
                JSONObject source = payload.getJSONObject("source");
                String table = source.getString("table");
                if ("scm_contract_template".equals(table)) {

                }
            }
        } catch (Exception e) {
            log.error("binlog 监听时执行操作报错:{}", e.getMessage());
        }


    }
}

CDC 监听到的 Binlog JSON 数据

{
    "schema": {
        "name": "my-app-connector.dev_ahzilai.scm_contract_template.Envelope",
        "optional": false,
        "type": "struct",
        "fields": [
            {
                "field": "before",
                "name": "my-app-connector.dev_ahzilai.scm_contract_template.Value",
                "optional": true,
                "type": "struct",
                "fields": [
                    {
                        "field": "data_id",
                        "optional": false,
                        "type": "string"
                    },
                    {
                        "field": "title",
                        "optional": false,
                        "type": "string"
                    },
                    {
                        "field": "creator",
                        "optional": false,
                        "type": "string"
                    },
                    {
                        "default": 0,
                        "field": "create_time",
                        "name": "io.debezium.time.Timestamp",
                        "optional": false,
                        "type": "int64",
                        "version": 1
                    },
                    {
                        "field": "updater",
                        "optional": false,
                        "type": "string"
                    },
                    {
                        "default": 0,
                        "field": "update_time",
                        "name": "io.debezium.time.Timestamp",
                        "optional": false,
                        "type": "int64",
                        "version": 1
                    }
                ]
            },
            {
                "field": "after",
                "name": "my-app-connector.dev_ahzilai.scm_contract_template.Value",
                "optional": true,
                "type": "struct",
                "fields": [
                    {
                        "field": "data_id",
                        "optional": false,
                        "type": "string"
                    },
                    {
                        "field": "title",
                        "optional": false,
                        "type": "string"
                    },
                    {
                        "field": "creator",
                        "optional": false,
                        "type": "string"
                    },
                    {
                        "default": 0,
                        "field": "create_time",
                        "name": "io.debezium.time.Timestamp",
                        "optional": false,
                        "type": "int64",
                        "version": 1
                    },
                    {
                        "field": "updater",
                        "optional": false,
                        "type": "string"
                    },
                    {
                        "default": 0,
                        "field": "update_time",
                        "name": "io.debezium.time.Timestamp",
                        "optional": false,
                        "type": "int64",
                        "version": 1
                    }
                ]
            },
            {
                "field": "source",
                "name": "io.debezium.connector.mysql.Source",
                "optional": false,
                "type": "struct",
                "fields": [
                    {
                        "field": "version",
                        "optional": false,
                        "type": "string"
                    },
                    {
                        "field": "connector",
                        "optional": false,
                        "type": "string"
                    },
                    {
                        "field": "name",
                        "optional": false,
                        "type": "string"
                    },
                    {
                        "field": "ts_ms",
                        "optional": false,
                        "type": "int64"
                    },
                    {
                        "default": "false",
                        "field": "snapshot",
                        "name": "io.debezium.data.Enum",
                        "optional": true,
                        "type": "string",
                        "version": 1,
                        "parameters": {
                            "allowed": "true,last,false,incremental"
                        }
                    },
                    {
                        "field": "db",
                        "optional": false,
                        "type": "string"
                    },
                    {
                        "field": "sequence",
                        "optional": true,
                        "type": "string"
                    },
                    {
                        "field": "table",
                        "optional": true,
                        "type": "string"
                    },
                    {
                        "field": "server_id",
                        "optional": false,
                        "type": "int64"
                    },
                    {
                        "field": "gtid",
                        "optional": true,
                        "type": "string"
                    },
                    {
                        "field": "file",
                        "optional": false,
                        "type": "string"
                    },
                    {
                        "field": "pos",
                        "optional": false,
                        "type": "int64"
                    },
                    {
                        "field": "row",
                        "optional": false,
                        "type": "int32"
                    },
                    {
                        "field": "thread",
                        "optional": true,
                        "type": "int64"
                    },
                    {
                        "field": "query",
                        "optional": true,
                        "type": "string"
                    }
                ]
            },
            {
                "field": "op",
                "optional": false,
                "type": "string"
            },
            {
                "field": "ts_ms",
                "optional": true,
                "type": "int64"
            },
            {
                "field": "transaction",
                "name": "event.block",
                "optional": true,
                "type": "struct",
                "fields": [
                    {
                        "field": "id",
                        "optional": false,
                        "type": "string"
                    },
                    {
                        "field": "total_order",
                        "optional": false,
                        "type": "int64"
                    },
                    {
                        "field": "data_collection_order",
                        "optional": false,
                        "type": "int64"
                    }
                ],
                "version": 1
            }
        ],
        "version": 1
    },
    "payload": {
        "op": "r",
        "after": {
            "creator": "8f7343bc-b2ea-42c8-afdb-352910568f06",
            "update_time": 1701264228000,
            "create_time": 1700999411000,
            "data_id": "data1728622149796843520",
            "title": "价格确认函",
            "updater": "8f7343bc-b2ea-42c8-afdb-352910568f06"
        },
        "source": {
            "server_id": 0,
            "version": "2.3.4.Final",
            "file": "binlog.000196",
            "connector": "mysql",
            "pos": 1098539,
            "name": "my-app-connector",
            "row": 0,
            "ts_ms": 1703006585000,
            "snapshot": "last",
            "db": "dev_ahzilai",
            "table": "scm_contract_template"
        },
        "ts_ms": 1702956185460
    }
}
字段名称描述
op描述导致连接器生成事件的操作类型的必需字符串。
在此示例中,指示操作创建了一行。有效值为:c
c= 创建
u= 更新
d= 删除
r= 读取(仅适用于快照)

遇到的问题

tmp/offsets/ 文件夹不会自动创建,故需要在 MySQLBinlogConfig.java 中先创建该文件夹。

schema:
  history:
    class: io.debezium.storage.file.history.FileSchemaHistory
    file: tmp/history/db-history.dat # 会自动创建 history 文件夹 
offset:
  storage:
    file: tmp/offsets/offset.dat # 不会自动创建 offsets 文件夹
Last Updated 3/2/2024, 4:00:59 PM