CDC 监听 Binlog
Siona
Debezium 监听 MySQL Binlog
一、pom.xml 导入依赖
<properties>
<debezium.version>2.3.4.Final</debezium.version>
</properties>
<dependencies>
<!-- 监听binlog -->
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${debezium.version}</version>
</dependency>
</dependencies>
2. Debezium 配置文件
application.yml
# Debezium CDC 监听 binlog
debezium:
connector:
name: my-connector-001
class: io.debezium.connector.mysql.MySqlConnector
tasks.max: 1
database:
hostname: 127.0.0.1
port: 3306
user: root
password: 123456
server:
id: 1234
name: my-server-1
include:
list: "dev_ahzilai"
table:
include:
list: "dev_ahzilai.scm_contract_contract, dev_ahzilai.scm_contract_template"
schemas:
enable: false
include:
schema:
changes: false
schema:
history:
class: io.debezium.storage.file.history.FileSchemaHistory
file: tmp/history/db-history.dat
offset:
storage:
file: tmp/offsets/offset.dat
flush:
interval: 60000
topic:
prefix: "my-app-connector"
DebeziumProperties.java
配置类
package com.siona.dao.config;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* Debezium 配置类
* @Description TODO
* @Version 1.0.0
* @Date 2023/12/18 16:02
* @Created by Siona
*/
@Data
@Component
public class DebeziumProperties {
@Value("${debezium.database.hostname}")
private String hostname;
@Value("${debezium.database.port}")
private String port;
@Value("${debezium.database.user}")
private String user;
@Value("${debezium.database.password}")
private String password;
@Value("${debezium.database.server.id}")
private String serverId;
@Value("${debezium.database.server.name}")
private String serverName;
@Value("${debezium.connector.name}")
private String connectorName;
@Value("${debezium.connector.class}")
private String connectorClass;
@Value("${debezium.offset.storage.file}")
private String offsetStorageFileName;
@Value("${debezium.offset.flush.interval}")
private String offsetFlushInterval;
@Value("${debezium.database.include.list}")
private String databaseIncludeList;
@Value("${debezium.table.include.list}")
private String tableIncludeList;
@Value("${debezium.schemas.enable}")
private String schemasEnable;
@Value("${debezium.include.schema.changes}")
private String includeSchemaChanges;
@Value("${debezium.schema.history.class}")
private String schemaHistoryClass;
@Value("${debezium.schema.history.file}")
private String schemaHistoryFile;
@Value("${debezium.offset.flush.interval}")
private Long flushInterval;
@Value("${debezium.topic.prefix}")
private String topicPrefix;
}
3. Debezium 监听 MySQL 配置类
后期考虑拆分 connect 为 start 等方法
MyBinlogConfig.java
package com.siona.dao.config;
import com.qx.dao.listener.BinlogUpdateEvent;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @Description TODO
* @Version 1.0.0
* @Date 2023/12/18 16:00
* @Created by Siona
*/
@Configuration
@Slf4j
@Data
public class MySQLBinlogConfig {
@Autowired
private DebeziumProperties debeziumProperties;
@Resource
ApplicationContext applicationContext;
DebeziumEngine<ChangeEvent<String, String>> engine;
@Bean
public void debeziumConfig() {
// Define the configuration for the embedded and MySQL connector ...
io.debezium.config.Configuration config = io.debezium.config.Configuration.create()
/* begin engine properties */
.with("connector.class", debeziumProperties.getConnectorClass())
.with("offset.storage", FileOffsetBackingStore.class.getCanonicalName())
.with("offset.storage.file.filename", debeziumProperties.getOffsetStorageFileName())
.with("offset.flush.interval.ms", debeziumProperties.getFlushInterval())
.with("schemas.enable", debeziumProperties.getSchemasEnable())
.with("include.schema.changes", debeziumProperties.getIncludeSchemaChanges())
/* begin connector properties */
.with("name", debeziumProperties.getConnectorName())
.with("database.hostname", debeziumProperties.getHostname())
.with("database.port", debeziumProperties.getPort())
.with("database.user", debeziumProperties.getUser())
.with("database.password", debeziumProperties.getPassword())
.with("database.include.list", debeziumProperties.getDatabaseIncludeList())
.with("table.include.list", debeziumProperties.getTableIncludeList())
.with("database.server.id", debeziumProperties.getServerId())
.with("database.server.name", debeziumProperties.getServerName())
.with("topic.prefix", debeziumProperties.getTopicPrefix())
.with("schema.history.internal", debeziumProperties.getSchemaHistoryClass())
.with("schema.history.internal.file.filename", debeziumProperties.getSchemaHistoryFile())
.build();
try {
// 2. 构建 DebeziumEngine
// 使用 Json 格式
engine = DebeziumEngine.create(Json.class)
.using(config.asProperties())
.notifying(record -> {
applicationContext.publishEvent(new BinlogUpdateEvent(record.value()));
})
.using((success, message, error) -> {
if (!success && error != null) {
log.error("Debezium 报错 Message:{}", message);
log.error("Debezium 报错 Error:{}", error.getMessage());
log.error("Debezium 重连中...");
connect(this.engine);
}
})
.build();
connect(engine);
} catch (Exception e) {
log.error("Debezium 启动失败:{}", e.getMessage());
connect(this.engine);
}
}
public void connect(DebeziumEngine<ChangeEvent<String, String>> engine){
// Run the engine asynchronously ...
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(engine);
log.info("Debezium 已启动...");
}
}
package com.siona.dao.listener;
import org.springframework.context.ApplicationEvent;
/**
* @Description TODO
* @Version 1.0.0
* @Date 2023/11/05 17:38
* @Created by Siona
*/
public class BinlogUpdateEvent extends ApplicationEvent {
public BinlogUpdateEvent(String source) {
super(source);
}
}
package com.siona.dao.listener;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
/**
* 同步事件监听
*
* @Description TODO
* @Version 1.0.0
* @Date 2023/11/05 17:37
* @Created by Siona
*/
@Component
@Slf4j
public class BinlogUpdateListener implements ApplicationListener<ApplicationEvent> {
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof BinlogUpdateEvent) {
// binlog 更新事件
BinlogUpdateEvent binlogUpdateEvent = (BinlogUpdateEvent) event;
String binlogJson = (String) binlogUpdateEvent.getSource();
binlogUpdate(binlogJson);
}
}
/**
* 处理 binlog 更新
*
* @param binlogJson json 字符串
*/
private void binlogUpdate(String binlogJson) {
try {
JSONObject binlogJsonObj = JSON.parseObject(binlogJson);
JSONObject payload = binlogJsonObj.getJSONObject("payload");
// 只监听 c,u,d
if (!"r".equals(payload.get("op"))) {
log.warn("binlogJsonObj: {}", binlogJsonObj);
JSONObject after = payload.getJSONObject("after");
JSONObject source = payload.getJSONObject("source");
String table = source.getString("table");
if ("scm_contract_template".equals(table)) {
}
}
} catch (Exception e) {
log.error("binlog 监听时执行操作报错:{}", e.getMessage());
}
}
}
CDC 监听到的 Binlog JSON 数据
{
"schema": {
"name": "my-app-connector.dev_ahzilai.scm_contract_template.Envelope",
"optional": false,
"type": "struct",
"fields": [
{
"field": "before",
"name": "my-app-connector.dev_ahzilai.scm_contract_template.Value",
"optional": true,
"type": "struct",
"fields": [
{
"field": "data_id",
"optional": false,
"type": "string"
},
{
"field": "title",
"optional": false,
"type": "string"
},
{
"field": "creator",
"optional": false,
"type": "string"
},
{
"default": 0,
"field": "create_time",
"name": "io.debezium.time.Timestamp",
"optional": false,
"type": "int64",
"version": 1
},
{
"field": "updater",
"optional": false,
"type": "string"
},
{
"default": 0,
"field": "update_time",
"name": "io.debezium.time.Timestamp",
"optional": false,
"type": "int64",
"version": 1
}
]
},
{
"field": "after",
"name": "my-app-connector.dev_ahzilai.scm_contract_template.Value",
"optional": true,
"type": "struct",
"fields": [
{
"field": "data_id",
"optional": false,
"type": "string"
},
{
"field": "title",
"optional": false,
"type": "string"
},
{
"field": "creator",
"optional": false,
"type": "string"
},
{
"default": 0,
"field": "create_time",
"name": "io.debezium.time.Timestamp",
"optional": false,
"type": "int64",
"version": 1
},
{
"field": "updater",
"optional": false,
"type": "string"
},
{
"default": 0,
"field": "update_time",
"name": "io.debezium.time.Timestamp",
"optional": false,
"type": "int64",
"version": 1
}
]
},
{
"field": "source",
"name": "io.debezium.connector.mysql.Source",
"optional": false,
"type": "struct",
"fields": [
{
"field": "version",
"optional": false,
"type": "string"
},
{
"field": "connector",
"optional": false,
"type": "string"
},
{
"field": "name",
"optional": false,
"type": "string"
},
{
"field": "ts_ms",
"optional": false,
"type": "int64"
},
{
"default": "false",
"field": "snapshot",
"name": "io.debezium.data.Enum",
"optional": true,
"type": "string",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
}
},
{
"field": "db",
"optional": false,
"type": "string"
},
{
"field": "sequence",
"optional": true,
"type": "string"
},
{
"field": "table",
"optional": true,
"type": "string"
},
{
"field": "server_id",
"optional": false,
"type": "int64"
},
{
"field": "gtid",
"optional": true,
"type": "string"
},
{
"field": "file",
"optional": false,
"type": "string"
},
{
"field": "pos",
"optional": false,
"type": "int64"
},
{
"field": "row",
"optional": false,
"type": "int32"
},
{
"field": "thread",
"optional": true,
"type": "int64"
},
{
"field": "query",
"optional": true,
"type": "string"
}
]
},
{
"field": "op",
"optional": false,
"type": "string"
},
{
"field": "ts_ms",
"optional": true,
"type": "int64"
},
{
"field": "transaction",
"name": "event.block",
"optional": true,
"type": "struct",
"fields": [
{
"field": "id",
"optional": false,
"type": "string"
},
{
"field": "total_order",
"optional": false,
"type": "int64"
},
{
"field": "data_collection_order",
"optional": false,
"type": "int64"
}
],
"version": 1
}
],
"version": 1
},
"payload": {
"op": "r",
"after": {
"creator": "8f7343bc-b2ea-42c8-afdb-352910568f06",
"update_time": 1701264228000,
"create_time": 1700999411000,
"data_id": "data1728622149796843520",
"title": "价格确认函",
"updater": "8f7343bc-b2ea-42c8-afdb-352910568f06"
},
"source": {
"server_id": 0,
"version": "2.3.4.Final",
"file": "binlog.000196",
"connector": "mysql",
"pos": 1098539,
"name": "my-app-connector",
"row": 0,
"ts_ms": 1703006585000,
"snapshot": "last",
"db": "dev_ahzilai",
"table": "scm_contract_template"
},
"ts_ms": 1702956185460
}
}
字段名称 | 描述 |
---|---|
op | 描述导致连接器生成事件的操作类型的必需字符串。 在此示例中,指示操作创建了一行。有效值为:c c= 创建 u= 更新 d= 删除 r= 读取(仅适用于快照) |
遇到的问题
tmp/offsets/ 文件夹不会自动创建,故需要在 MySQLBinlogConfig.java 中先创建该文件夹。
schema:
history:
class: io.debezium.storage.file.history.FileSchemaHistory
file: tmp/history/db-history.dat # 会自动创建 history 文件夹
offset:
storage:
file: tmp/offsets/offset.dat # 不会自动创建 offsets 文件夹