从 MySQL 到 DolphinDB,Debezium + Kafka 数据同步实战

2023-12-20 06:33:59

Debezium 是一个开源的分布式平台,用于实时捕获和发布数据库更改事件。它可以将关系型数据库(如 MySQL、PostgreSQL、Oracle 等)的变更事件转化为可观察的流数据,以供其他应用程序实时消费和处理。本文中我们将采用 Debezium 与 Kafka 组合的方式来实现从 MySQL 到 DolphinDB 的数据同步。

Kafka + Debezium 的数据同步方案需要部署 4 个服务,如下所示

  • ZooKeeper:kafka 的依赖部署
  • Kafka:数据存储
  • Kafka-Connect:用于接入数据插件 source-connetor、sink-connector 的框架,可以提供高可用。也可以部署单实例版本。
  • Schema-Registry?:提供实时同步的数据的元数据注册功能 ,支持数据序列化。

基于 Debezium 的数据架构图如下:

接下来,本文将逐一介绍这些服务的下载、安装,以及配置数据同步任务。

部署 Kafka 单实例实时数据存储

基于 Kafka 的整套架构是支持高可用集群的。不过,即使部署单实例的服务,也可以达成数据同步任务。

本文将以单实例存储来进行介绍数据同步。

部署准备

首先下载程序包,Zookeeper(开源)、Kafka(开源)、Confluent(社区版),可以自行到官网下载最新稳定版本。

并将下面 4 个软件包放到 /opt 目录下。(软件、配置、数据路径文件较多。注意:初次试用请尽量保持路径一致。)

Confluent 下载会稍微麻烦点,需要选择 self-managed 然后录入信息,点击 start free 才能下载。注意下载 community 版本即可满足需要,我们只需要里头的?schema-registry?包。当然如果需要更好功能,也可以下载正式版,正式版包括了 Zookeeper、Kafka 以及管理、监控 Kafka 的更多功能。Confluent 是 Kafka 相关的商业公司。

以上 4 个程序包下载好之后,我们就可以开始部署了。

部署 Zookeeper

基础准备

第一步:创建部署用户

创建用户 kafka,授予 sudo 免密权限(需自行设置)。然后切换到 kafka 用户来进行操作(以下均为 kafka 用户操作)。

useradd kafka
su kafka

第二步:安装部署 java 环境

安装 java 到路径 /opt/java17,整套架构涉及的程序都是基于 java 虚拟机运行的。所以必须安装 java。

cd /opt
sudo mkdir -p /usr/local/java
sudo tar -xvf jdk-17.0.7_linux-x64_bin.tar.gz
sudo mv jdk-17.0.7 /usr/local/java/java17

设置 java 环境变量(kafka 用户下执行)。

vim ~/.bashrc
# 输入下面代码
JAVA_HOME=/usr/local/java/java17
PATH=$JAVA_HOME/bin:$PATH
export JAVA_HOME PATH

source ~/.bashrc
java --version

安装 Zookeeper

第一步:解压并安装 Zookeeper

3.7.1 版本的 Zookeeper 用户、组有默认值,这里我们需要调整一下。

cd /opt
sudo tar -xvf apache-zookeeper-3.7.1-bin.tar.gz
sudo mv apache-zookeeper-3.7.1-bin zookeeper
sudo chown -R root:root zookeeper
sudo chmod -R 755 zookeeper

第二步:准备 Zookeeper 的配置文件和存储文件

创建 zookeeper 的配置文件、数据文件、日志文件的存储路径。请尽量保持路径一致。篇末有打包的全部程序配置文件包。

sudo mkdir -p /KFDATA/zookeeper/etc
sudo mkdir -p /KFDATA/zookeeper/data
sudo mkdir -p /KFDATA/zookeeper/datalog
sudo mkdir -p /KFDATA/zookeeper/logs
sudo chown -R kafka:kafka /KFDATA
chmod -R 700 /KFDATA/zookeeper

准备 zookeeper 的配置文件?zoo.cfg。先从 zookeeper 安装路径下复制?log4j.properties?过来,然后进行修改。

说明:zookeeper 的不同版本?log4j.properties?配置内容会略有区别。如有不同,请按 log4j 的规则调整。

cd /KFDATA/zookeeper/etc
touch zoo.cfg
echo tickTime=2000 > zoo.cfg
echo initLimit=10 >>zoo.cfg
echo syncLimit=5 >>zoo.cfg
echo  dataDir=/KFDATA/zookeeper/data >>zoo.cfg
echo  dataLogDir=/KFDATA/zookeeper/datalog >>zoo.cfg
echo  clientPort=2181 >>zoo.cfg
sudo cp /opt/zookeeper/conf/log4j.properties ./
sudo chown kafka:kafka ./log4j.properties

修改?log4j.properties?中的?zookeeper.log.dir?参数

第三步:创建 Zookeeper 的启动文件

创建一个?zk.env?,配置 Zookeeper 启动所需环境变量,用于启动?service?文件调用。

cd /KFDATA/zookeeper/etc/
touch zk.env
echo JAVA_HOME=/usr/local/java/java17 > zk.env
echo PATH="/usr/local/java/java17/bin:/opt/zookeeper/bin:/usr/local/bin:/bin:/usr/bin:/usr/local/sbin:/usr/sbin" >> zk.env
echo ZOO_LOG_DIR=/KFDATA/zookeeper/logs >> zk.env
echo ZOO_LOG4J_OPTS=\"-Dlog4j.configuration=file:/KFDATA/zookeeper/etc/log4j.properties\" >> zk.env

如果对 Zookeeper 很熟练可以自行调用 Zookeeper 安装目录下的?bin?文件夹下的操作脚本来进行操作或测试。

使用?vim?命令编辑一个?service?文件。

sudo vim /usr/lib/systemd/system/zookeeper.service

录入以下启动命令信息,并保存。

[Unit]
Description=Apache Kafka - ZooKeeper
After=network.target

[Service]
Type=forking
User=kafka
Group=kafka
EnvironmentFile=/KFDATA/zookeeper/etc/zk.env
ExecStart=/opt/zookeeper/bin/zkServer.sh start /KFDATA/zookeeper/etc/zoo.cfg
ExecStop=/opt/zookeeper/bin/zkServer.sh stop /KFDATA/zookeeper/etc/zoo.cfg
TimeoutStopSec=180
Restart=no


[Install]
WantedBy=multi-user.target

重新加载 service 启动服务。

sudo systemctl daemon-reload

第四步:创建测试脚本

(1)创建连接 Zookeeper 测试文件?zkCon.sh

mkdir -p /KFDATA/bin
cd /KFDATA/bin
touch zkCon.sh
echo export JAVA_HOME=/usr/local/java/java17 >zkCon.sh
echo export PATH="{$JAVE_HOME}/bin:/opt/zookeeper/bin:/usr/local/bin:/bin:/usr/bin:/usr/local/sbin:/usr/sbin" >>zkCon.sh
echo export ZOO_LOG_DIR=/KFDATA/zookeeper/logs >>zkCon.sh
echo export ZOO_LOG4J_OPTS=\"-Dlog4j.configuration=file:/KFDATA/zookeeper/etc/log4j.properties\" >>zkCon.sh
echo  '/opt/zookeeper/bin/zkCli.sh -server localhost:2181 -Dzookeeper.config.path=/KFDATA/zookeeper/zoo.cfg' >>zkCon.sh

对脚本授予执行权限。

chmod +x  zkCon.sh 

部署启动 Zookeeper

第一步:通过 systemctl 工具启动 Zookeeper 服务。

sudo systemctl start zookeeper.service

第二步:查看 Zookeeper 启动情况

可以通过?jps?命令 查看 java 进程, QuorumPeerMain 进程是 Zookeeper 的启动进程。

也可以通过?systemctl?命令查看,如图即是正常启动。

 sudo systemctl status zookeeper

第三步:通过客户端连接 Zookeeper ,并进行查看。

cd /KFDATA/bin/
./zkCon.sh
# 等待 zookeeper 命令行窗口
ls /
ls /zookeeper 

如果返回如下显示,表示 Zookeeper 启动成功,可以在 Zookeeper 中观察到自身的基础信息。

ctrl +c 可以退出 Zookeeper 客户端连接。

部署 Kafka

安装 Kafka

第一步:解压安装 Kafka 文件

执行以下命令,修改一下 Kafka 的安装文件名。

cd /opt
sudo tar -xvf kafka_2.13-3.4.1.tgz
sudo mv kafka_2.13-3.4.1 kafka

第二步:准备 Kafka 的配置文件和存储文件

创建 Kafka 的配置文件、数据文件、日志文件的存储路径。

mkdir -p /KFDATA/kafka/etc
mkdir -p /KFDATA/kafka/data
mkdir -p /KFDATA/kafka/logs

准备 Kafka 相关配置文件,创建启动配置文件,和日志配置文件。

cd /KFDATA/kafka/etc
touch kafka-server.properties
cp /opt/kafka/config/log4j.properties ./
cp /opt/kafka/config/tools-log4j.properties ./

修改?kafka-server.properties?文件中的配置,修改内容较多,文件如下,也可以自行录入。

############################# Server Basics #############################
broker.id=1
############################# Socket Server Settings #############################
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.189.130:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
############################# Log Basics #############################
log.dirs=/KFDATA/kafka/data
num.partitions=1
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Retention Policy #############################
log.retention.hours=-1
log.retention.bytes=21474836480 
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
auto.create.topics.enable=true
############################# Zookeeper #############################
zookeeper.connect=192.168.189.130:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=12000
############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0
############################# message Settings #############################
message.max.byte=5242880

其中以下两项需要视具体环境修改,advertise.listeners 是对外监听端口。

advertised.listeners=PLAINTEXT://192.168.189.130:9092
zookeeper.connect=192.168.189.130:2181

第三步:准备 Kafka 的启动文件

创建 Kafka 启动的环境变量文件,这里配置了开启 JMX 监控端口,如果不需要,可以忽略后两项配置。

JMX 端口的作用是可以通过此端口连接,获取一些监控指标。

cd /KFDATA/kafka/etc
touch kf-server.env

echo PATH="/usr/local/java/java17/bin:/opt/zookeeper/bin:/opt/kafka:/usr/local/bin:/bin:/usr/bin:/usr/local/sbin:/usr/sbin" >>kf-server.env
echo LOG_DIR="/KFDATA/kafka/logs/" >>kf-server.env
echo KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:/KFDATA/kafka/etc/log4j.properties\" >>kf-server.env
echo KAFKA_JMX_OPTS=\"-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.189.130 -Djava.net.preferIPv4Stack=true\" >>kf-server.env
echo JMX_PORT=29999 >>kf-server.env

创建 Kafka 的?systemd service?文件,vim?打开一个文件。

sudo vim /usr/lib/systemd/system/kafka-server.service

录入以下内容,并保存。

[Unit]
Description=Apache Kafka - broker
After=network.target confluent-zookeeper.target

[Service]
Type=forking
User=kafka
Group=kafka
EnvironmentFile=/KFDATA/kafka/etc/kf-server.env
ExecStart=/opt/kafka/bin/kafka-server-start.sh -daemon /KFDATA/kafka/etc/kafka-server.properties
ExecStop=/KFDATA/kafka/bin/kafka-server-stop.sh
LimitNOFILE=1000000
TimeoutStopSec=180
Restart=no

[Install]
WantedBy=multi-user.target

重新加载 service 启动服务。

sudo systemctl daemon-reload

部署启动 Kafka

第一步:通过 systemctl 工具启动 Kafka 服务

执行下述命令启动 Kafka 服务:

sudo systemctl start kafka-server.service

第二步:查看 Kafka 启动情况

检查 Kafka 启动情况,可以连接 Zookeeper 客户端 。查看 zookeeper 中的数据。

cd /KFDATA/bin
./zkCon.sh
ls /

可以看到 Zookeeper 中已经多了一些 kafka 注册信息,如 brokers、cluseter、config、controller 等。

此时,可以测试创建一个 topic 进行测试:

cd  /opt/kafka/bin
./kafka-topics.sh --bootstrap-server 192.168.189.130:9092 --create --topic test110

执行下述代码,查看当前 Kafka 中 topic 列表:

./kafka-topics.sh --bootstrap-server 192.168.189.130:9092 --list

如果返回上述图片显示内容,说明 Kafka 已经启动成功。

部署 Schema-Registry

Schema-Registry 是用于注册传输数据的数据结构的。并记录数据结构改变的每一个版本。数据写入 Kafka 和从 Kafka 中读出都需要 schema-registry 中记录的数据结构来进行序列化和反序列化。通过使用 schema-registry 来注册数据结构。Kafka 中只需保存序列化后的数据即可。可以减少数据的空间占用。

安装 Schema-Registry

第一步:解压安装 Schema-Registry 文件

Schema-Registry 程序是 confluent 程序包中一部分。所以这里我们要安装 conluent,社区版本即可。解压缩?confluent-community-7.4.0.tar.gz,并修改文件名,设置隶属组。

cd /opt
sudo tar -xvf confluent-community-7.4.0.tar.gz
sudo mv confluent-7.4.0 confluent
sudo chown -R root:root confluent
sudo chmod -R 755 confluent

第二步:准备 Schema-Registry 的配置文件和存储文件

创建 schema-registry 的配置、日志文件存储路径。

mkdir -p /KFDATA/schema-registry/etc
mkdir -p /KFDATA/schema-registry/logs

准备 schema-registry 的配置文件。

cd /KFDATA/schema-registry/etc
cp /opt/confluent/etc/schema-registry/schema-registry.properties ./
cp /opt/confluent/etc/schema-registry/log4j.properties ./

修改?schema-registry.properties?文件,修改连接的 Kafka Server 地址。

第三步:准备 Schema-Registry 的启动文件

创建 Schema-Registry 启动环境变量文件,用于 Schema-Registry 启动时使用。

touch schema-registry.env
echo PATH="/usr/local/java/java17/bin:/opt/confluent/bin:/usr/local/bin:/bin:/usr/bin:/usr/local/sbin:/usr/sbin" >schema-registry.env
echo LOG_DIR="/KFDATA/schema-registry/logs" >>schema-registry.env
echo LOG4J_DIR="/KFDATA/schema-registry/etc/log4j.properties" >>schema-registry.env
echo SCHEMA_REGISTRY_LOG4J_OPTS=\"-Dlog4j.configuration=file:/KFDATA/schema-registry/etc/log4j.properties\" >>schema-registry.env

创建 Schema-Registry 的 systemd service 启动文件。

sudo vim /usr/lib/systemd/system/schema-registry.service

录入以下内容并保存。

[Unit]
Description=RESTful Avro schema registry for Apache Kafka
After=network.target

[Service]
Type=forking
User=kafka
Group=kafka
EnvironmentFile=/KFDATA/schema-registry/etc/schema-registry.env
ExecStart=/opt/confluent/bin/schema-registry-start -daemon /KFDATA/schema-registry/etc/schema-registry.properties
TimeoutStopSec=180
Restart=no

[Install]
WantedBy=multi-user.target

重新加载 service 启动服务。

sudo systemctl daemon-reload

部署启动 Schema-Registry

第一步:通过 systemctl 工具启动 Schema-Registry 服务

执行以下命令

sudo systemctl start schema-registry

第二步:查看 Schema-Registry 启动情况

通过 systemctl 工具查看启动状态。

sudo systemctl status schema-registry

查看 Kafka 中的 topic

cd /opt/kafka/bin
./kafka-topics.sh --bootstrap-server 192.168.189.130:9092 --list

可以看到 kafka 中已经创建出了 schema-registry 需要使用的 topic

schema-registry 启动成功。

部署 Kafka-Connect

Kafka-Connect 是 Kafka 提供的 HA 框架,实现了 Kafka-Connect 接口的 connector(连接器),只需处理自己需要进行读取、写入数据任务。高可用部分由 kafka-connect 框架负责。

Kafka-Connect 可用通过 rest api 进行访问。

安装 Kafka-Connect

第一步:Kafka-Connect 安装

Kafka-Connect 由 Kafka 提供,启动程序在 Kafka 的安装路径下,已经存在。数据元数据注册由 schema-registry 处理。相应的序列化包在已安装 Confluent 路径下。故无需再安装程序包。

第二步:准备 Kafka-Connect 的配置文件和存储文件

创建 Kafka-Connect 的配置、日志文件存储路径

mkdir -p /KFDATA/kafka-connect/etc
mkdir -p /KFDATA/kafka-connect/logs

创建 Kafka-Connect 的配置文件

cd /KFDATA/kafka-connect/etc
vim kafka-connect.properties

录入以下内容并保存。 ip 地址部分,需要视当前环境修改。

bootstrap.servers=192.168.189.130:9092
group.id=connect-cluster

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://192.168.189.130:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://192.168.189.130:8081
key.converter.schemas.enable=true
value.converter.schemas.enable=true

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

plugin.path=/opt/confluent/share/java/plugin
rest.host.name=192.168.189.130
rest.port=8083
rest.advertised.host.name=192.168.189.130
rest.advertised.port=8083

offset.flush.timeout.ms=50000
offset.flush.interval.ms=10000
send.buffer.bytes=13107200
consumer.max.poll.records=10000
consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor

创建 Kafka-Connect 的 log4j 配置文件。

cd /KFDATA/kafka-connect/etc
cp /opt/kafka/config/connect-log4j.properties ./log4j.properties

修改文件中的以下参数配置

vim ./log4j.properties
log4j.appender.connectAppender.File=${kafka.logs.dir}/connect.log

将其修改为

log4j.appender.connectAppender.File=/KFDATA/kafka-connect/logs/connect.log

第三步:准备 Kafka-Connect 的启动文件

创建 Kafka-Connect 启动环境变量文件。

cd /KFDATA/kafka-connect/etc
touch kafka-connect.env

echo PATH="/usr/local/java/java17/bin:/usr/local/bin:/bin:/usr/bin:/usr/local/sbin:/usr/sbin" >kafka-connect.env
echo LOG_DIR="/KFDATA/kafka-connect/logs/" >>kafka-connect.env
echo LOG4J_DIR="/KFDATA/kafka-connect/etc/log4j.properties" >>kafka-connect.env
echo KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:/KFDATA/kafka-connect/etc/log4j.properties\" >>kafka-connect.env
echo CLASSPATH=/opt/confluent/share/java/schema-registry/*:/opt/confluent/share/java/kafka-serde-tools/*:/opt/confluent/share/java/confluent-common/* >>kafka-connect.env
echo JMX_PORT=29998 >>kafka-connect.env

创建 Kafka-Connect 的 systemd service 文件

sudo vim /usr/lib/systemd/system/kafka-connect.service

录入以下内容,并保存。

[Unit]
Description=Apache Kafka Connect - distributed
After=network.target

[Service]
Type=simple
User=kafka
Group=kafka
EnvironmentFile=/KFDATA/kafka-connect/etc/kafka-connect.env
ExecStart=/opt/kafka/bin/connect-distributed.sh /KFDATA/kafka-connect/etc/kafka-connect.properties
TimeoutStopSec=180
Restart=no

[Install]
WantedBy=multi-user.target

重新加载 service 启动服务。

sudo systemctl daemon-reload

部署启动 Kafka-Connect

第一步:通过 systemctl 工具启动 Kafka-Connect 服务

执行以下命令

sudo systemctl start kafka-connect.service

第二步:查看 Kafka-Connect 启动情况

通过?jps?命令查看启动情况

jps -mlvV |grep connect

查看 Kafka 中的 topic 情况,Kafka-Connect 会在 Kafka 中创建 connect-configs 、connect-offsets、connect-statuses 三个 topic。

cd  /opt/kafka/bin
./kafka-topics.sh --bootstrap-server 192.168.189.130:9092 --list

使用?curl?命令访问 kafka-connect,可以看到当前我们还没有配置 connector 任务

 curl -H "Accept:application/json" 192.168.189.130:8083/connectors/

部署 MySQL 数据同步到 Kafka

MySQL 的数据同步包括初始全量同步和 CDC 实时增量同步。

全量同步:将所选表的全部数据以 Insert 的方式写入 kafka,建议此时不要对数据库进行操作。

CDC 实时增量同步:从全量同步时记录的事务顺序号,实时读取 MySQL 的?binlog?日志,写入增量数据到 Kafka。

安装 Debezium-MySQL 连接器插件

配置启动 Debezium-MySQL 连接器,需要以下两步:

  1. 下载、安装 Debezium-MySQL 插件,并将插件路径配置到 Kafka Connect 配置文件中。
  2. 重新启动 Kafka Connect 程序,以加载插件。

第一步:下载安装 Debezium-MySQL 插件

官方网站?Debezium?,选择最新稳定版本进行下载。

选择 MySQL Connector Plug-in

创建插件路径(部署 kafka,kafka-connnect 环境的 kafka 用户),在此路径下解压 Debezium 的 MySQL 插件包

sudo mkdir -p /opt/confluent/share/java/plugin
cd /opt/confluent/share/java/plugin
sudo tar -xvf debezium-connector-mysql-2.3.2.Final-plugin.tar.gz
rm ./debezium-connector-mysql-2.3.2.Final-plugin.tar.gz

第二步:配置 Kafka-Connect 加载插件

修改 Kafka Connect 的配置文件,添加插件路径配置

cd /KFDATA/kafka-connect/etc
vim kafka-connect.properties

添加或修改参数?plugin.path?如下

plugin.path=/opt/confluent/share/java/plugin

重新启动 Kafka Connect

sudo systemctl stop kafka-connect
sudo systemctl start kafka-connect

查看日志输出, 如下图所示,则插件加载成功。

cat /KFDATA/kafka-connect/logs/connect.log|grep mysql

配置 MySQL 数据库

做为 Source 数据库,我们基于 MySQL 的?binlog?来获取实时的增量数据,所以需要对 MySQL 数据库做一些设置。

第一步:创建数据同步用的 MySQL 用户

Debezium MySQL 连接器需要 MySQL 用户帐户。此 MySQL 用户必须对 Debezium MySQL 连接器捕获更改的所有数据库拥有适当的权限。

CREATE USER 'datasyn'@'%' IDENTIFIED BY '1234';

授予权限。

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'datasyn';

刷新授权表。

FLUSH PRIVILEGES;

第二步:设置 MySQL 参数

进行 CDC 同步,需要对 MySQL 数据库进行一些设置。

参数说明
server-id1MySQL 集群中用于标识一个 MySQL 服务器实例。可以自行调整设置。
log-binmysql-bin设置启用二进制日志功能,并指定日志文件名及存储位置。可自行调整设置。
binlog_formatROW必须 binlog-format 设置为 ROW 或 row。 连接 MySQL 级联复制实例时,链路内每个实例环节都要设置。
binlog_row_imageFULL必须 binlog_row_image 设置为 FULL 或 full 。连接 MySQL 级联复制实例时,链路内每个级联实例环节都要设置。
gtid_modeON设置开启全局事务标识
enforce_gtid_consistencyON设置强制执行 GTID 一致性
expire_logs_days3设置 MySQL 日志保留时间,MySQL 的 CDC 数据同步需要有对应日志文件才能进行同步。推荐至少设置保留3天。
binlog_row_value_options““此变量不能设置为 PARTIAL_JSON

参数参考代码:

[mysqld]
server-id = 1

log_bin=mysql-bin
binlog_format=ROW
binlog_row_image=FULL
binlog_row_value_options=""

gtid_mode=ON
enforce_gtid_consistency=ON

expire_logs_days=3

配置 MySQL 数据同步连接任务

配置同步任务的及检查的很多命令都要带上?url?等参数。为了操作快捷,封装了一些加载配置文件的操作脚本,kafka-tools.tar?。下载当前包,解压缩到 /KFDATA 目录下 。后续的很多操作,检查 Kafka 的 topic,查看数据。配置同步任务等都会使用 kafka-tools 包中的脚本。请务必配置。包中的脚本都可以无参数运行,会输出 help。

cd /KFDATA
sudo tar -xvf kafka-tools.tar
sudo chown kafka:kafka kafka-tools
rm ./kafka-tools.tar

修改 kafka-tools/config/config.properties 配置参数。

按照本机的路径、IP 等对应修改 Kafka、Kafka_Connect 的启动 IP 地址,以及安装目录。

准备MySQL 数据库表

第一步:创建一个数据库

create database basicinfo;

第二步:创建两张表,并插入一些数据

创建表1?index_components,主键字段 4 个。

use basicinfo;
CREATE TABLE `index_components` (
  `trade_date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `code` varchar(20) NOT NULL,
  `effDate` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `indexShortName` varchar(20) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci DEFAULT NULL,
  `indexCode` varchar(20) NOT NULL,
  `secShortName` varchar(20) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci DEFAULT NULL,
  `exchangeCD` varchar(4) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci DEFAULT NULL,
  `weight` decimal(26,6) DEFAULT NULL,
  `timestamp` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `flag` int NOT NULL DEFAULT '1',
  PRIMARY KEY `index_components_pkey` (`trade_date`,`code`,`indexCode`,`flag`)
)ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; 

插入 4 条 数据

insert into index_components (trade_date,code,effdate,indexShortName,indexCode,secShortName,exchangeCD,weight,timestamp,flag)
values('2006-11-30','000759','2018-06-30 03:48:05','中证500','000905','中百集团','XSHE',0.0044,'2018-06-30 05:43:05',1),
('2006-11-30','000759','2018-06-30 04:47:05','中证500','000906','中百集团','XSHE',0.0011,'2018-06-30 05:48:06',1),
('2006-11-30','600031','2018-06-30 05:48:05','上证180','000010','三一重工','XSHG',0.0043,'2018-06-30 05:48:05',1),
('2006-11-30','600031','2018-06-30 06:48:02','沪深300','000300','三一重工','XSHG',0.0029,'2018-06-30 05:48:05',1);

创建表2?stock_basic?,主键字段 2 个。

CREATE TABLE `stock_basic` (
  `id` bigint NOT NULL ,
  `ts_code` varchar(20) NOT NULL,
  `symbol` varchar(20) DEFAULT NULL,
  `name` varchar(20) DEFAULT NULL,
  `area` varchar(20) DEFAULT NULL,
  `industry` varchar(40) DEFAULT NULL,
  `list_date` date DEFAULT NULL,
  PRIMARY KEY (`id`,`ts_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

插入 3 条数据 ;

insert into stock_basic(id,ts_code,symbol,name,area,industry,list_date)
values (1,'000001.SZ','000001','平安银行','深圳','银行','1991-04-03'),
(2,'000002.SZ','000002','万科A','深圳','地产','1991-01-29'),
(3,'000004.SZ','000004','ST国华','深圳','软件服务','1991-01-14')

准备连接器配置文件,并启动连接任务

第一步:准备 MySQL 同步任务配置文件

创建连接 MySQL 的 source 连接器配置文件。

mkdir /KFDATA/datasyn-config
cd /KFDATA/datasyn-config
vim source-mysql.json

录入以下配置 ,hostname 和 kafka 启动地址需对应修改。

{
    "name": "basicinfo-connector",
    "config":{
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "topic.prefix":"mysqlserver",
        "database.hostname": "192.168.189.130",
        "database.port": "3306",
        "database.user": "datasyn",
        "database.password": "1234",
        "database.server.id": "2223314",
        "database.include.list": "basicinfo",
        "schema.history.internal.kafka.bootstrap.servers": "192.168.189.130:9092",
        "schema.history.internal.kafka.topic": "schema-changes.basicinfo",
        "heartbeat.interval.ms":"20000"
    }
}

参数说明:以上参数为必填参数。更多详细参数说明可以参看?Debezium connector for MySQL :: Debezium Documentation

参数名称默认值参数说明
connector.class连接器的 Java 类的名称。这里是 mysql 的连接器类名。
tasks.max1当前 connector 的最大并行任务数。mysql 的 source 连接器任务数只能是 1。
topic.prefix当前 connector 同步写入任务的命名空间。会被用于添加到同步表对应 topic 名称前等
database.hostnameMySQL 数据库服务器的 IP 地址或主机名。
database.port3306MySQL 数据库服务器的整数端口号。
database.userMySQL 数据库服务器连接用户。
database.passwordMySQL 数据库服务器连接用户密码。
database.server.id用来模拟 MySQL 隶属进程的进程号。同步程序会以此数字 ID 加入 MySQL 集群。
database.influde.list匹配的数据库名。可以多个,用逗号分割即可。
schema.history.internal.kafka.bootstrap.servers数据同步记录 MySQL 的表结构信息的 kafka 连接
schema.history.internal.kafka.topic数据同步记录 MySQL 表结构的 topic 名称
heartbeat.interval.ms0当接到 MySQL 更改事件时,保证触发记录 binlog 事务位置或者 gtid 的间隔事件。(如果此值为 0 时,接收到不属于数据同步表的改变事件时,不会记录事务位置,可能导致当前记录的同步事务号大幅度落后 MySQL 的最新事务号)。

第二步:启动 MySQL 的数据同步任务

通过 rest api 启动 MySQL 的 source 连接器

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://192.168.189.130:8083/connectors/ -d @/KFDATA/datasyn-config/source-mysql.json

也可以通过我们提供 kafka-tools 中的脚本启动,操作能简单一些

cd /KFDATA/kafka-tools/bin
./rest.sh create @/KFDATA/datasyn-config/source-mysql.json

第三步:查看 MySQL 数据同步任务状态

查看同步任务列表。list?参数展示任务名列表,showall?参数会显示全部同步任务状态。

./rest.sh list
./rest.sh showall

通过下图可以看到,connector 和 task 的状态都是 RUNNING,当前同步任务状态正常。

说明:每个同步任务会有一个 connector,可以多个 task。

使用 kafka-tools 的脚本?kafka.sh?查看 kafka 中的 topic

cd /KFDATA/kafka-tools/bin
./kafka.sh tplist|grep mysqlserver

下图中的 topic [mysqlserver.basicinfo.index_components] 即为我们的表?basicinfo.index_components?数据在 kafka 中的存储

查看 topic [mysqlserver.basicinfo.index_components] 中的数据条数。

./kafka.sh get_offsets mysqlserver.basicinfo.index_components

kafka 中已经同步了 MySQL 表?basicinfo.index_components?的 4 条数据。

说明: 在同步 MySQL 的初始快照数据时,不能中断。否则必须清理全部已同步数据,重新开始。即初始快照数据不支持断点续传。

部署 Kafka 数据同步到 DolphinDB

安装 Kafka-DolphinDB 连接器

配置启动 Kafka-DolphinDB 连接器插件,需要以下两步:

  1. 安装 Kafka-DolphinDB 插件,并将插件路径配置到 Kafka Connect 配置文件中。
  2. 重新启动 Kafka Connect 程序,以加载插件。

第一步:下载 Kafka-DolphinDB 插件

创建插件路径(部署 Kafka,Kafka-Connnect 环境的 kafka 用户),在此路径下放置 Kafka-DolphinDB 插件包,上面两个包都要放到此目录下。

sudo mkdir -p /opt/confluent/share/java/plugin/kafka-connect-jdbc

第二步:配置 Kafka-Connect 加载插件

Kafka-DolphinDB 插件包的父路径与前文 Debezium-MySQL 连接器插件路径均为 /opt/confluent/share/java/plugin/,因此无需再次配置到 Kafka-Connect 的配置文件中。

如果路径不一致,可以在?kafka-connect.properties?中的?plugin.path?参数里配置,以逗号分隔。

查看?plugin.path?参数配置:

cat /KFDATA/kafka-connect/etc/kafka-connect.properties |grep plugin

重新启动 Kafka Connect:

sudo systemctl stop kafka-connect
sudo systemctl start kafka-connect

查看日志输出

cat /KFDATA/kafka-connect/logs/connect.log|grep JdbcSinkConnector

出现下图中所示信息时,说明插件加载成功。

DolphinDB 的数据同步准备

第一步:创建同步的库、表

要求:当前支持数据同步,需要依赖 TSDB 引擎的?keepDuplicates?= LAST 数据来保证数据写入的幂等性,即发生数据重复时,两次及以上的相同增量数据写入,不影响数据的一致性。所以需要满足以下条件:

  1. DolphinDB 的表必须是 TSDB 引擎且设置?keepDuplicates?= LAST。
  2. TSDB 引擎目前不支持单字段?sortColumn?设置?keepDuplicates?= LAST,所以同步的 MySQL 目标表主键必 须是 2个及以上字段。
  3. sortColumn?最后的字段必须是时间或者数字。对应的 MySQL 目标表主键字段必须包含时间或数字。

分别创建之前 MySQL 中两张表的对应表:

  1. 创建 MySQL 表?basicinfo.index_components?的DolphinDB 对应分布式表 [dfs://index_data].[index_components]
def createIndexComDB(dbName){
	if(existsDatabase(dbName)){
	dropDatabase(dbName)
	}
	database(directory=dbName, partitionType=RANGE, partitionScheme= 1999.01M + (0..26)*12,engine="TSDB")
}
def createIndexCom(dbName,tbName){
	db=database(dbName)
             if(existsTable(dbName, tbName)){
                   db.dropTable(tbName)	
	}
	mtable=table(100:0, `trade_date`code`effDate`indexShortName`indexCode`secShortName`exchangeCD`weight`timestamp`flag, [TIMESTAMP,SYMBOL,TIMESTAMP,SYMBOL,SYMBOL,SYMBOL,SYMBOL,DOUBLE,TIMESTAMP,INT]);
	db.createPartitionedTable(table=mtable, tableName=tbName, partitionColumns=`trade_date,sortColumns=`code`indexCode`flag`trade_date,compressMethods={trade_date:"delta"},keepDuplicates=LAST)
}
createIndexComDB("dfs://index_data")
createIndexCom("dfs://index_data",`index_components)

2. 创建 MySQL 表?basicinfo.stock_basic?的 DolphinDB 对应分布式表 [dfs://wddb].[stock_basic]

def createStockBasicDB(dbName){
	if(existsDatabase(dbName)){
	dropDatabase(dbName)
	}
	db=database(directory=dbName, partitionType=HASH, partitionScheme=[LONG, 1],engine="TSDB")
}
def createStockBasic(dbName,tbName){
	db=database(dbName)
             if(existsTable(dbName, tbName)){
                   db.dropTable(tbName)	
	}
             mtable=table(100:5, `id`ts_code`symbol`name`area`industry`list_date, [LONG,SYMBOL,SYMBOL,SYMBOL,SYMBOL,SYMBOL,DATE]);
	 db.createPartitionedTable(table=mtable, tableName=tbName, partitionColumns=`id,sortColumns=`ts_code`id,keepDuplicates=LAST,sortKeyMappingFunction=[hashBucket{,100}])
}
createStockBasicDB("dfs://wddb")
createStockBasic("dfs://wddb", `stock_basic)

第二步:配置同步配置表

DolphinDB 做为数据的接收端,本身无需做数据库上的额外设置,按正常使用配置即可。但由于 DolphinDB 中的数据存储表通常以分布式表为主,且分布式表是按照分区规则放置在不同的库名下,不同库名下的表是支持重名的。所以需要提供对于 DolphinDB 中表的同步配置信息。

  1. 在 DolphinDB 中创建一张配置表。库、表名可在后续操作中调整,但是表中字段名要保持一致。
  • 数据库名:dfs://ddb_sync_config
  • 表名:sync_config
dbName = "dfs://ddb_sync_config"
if(existsDatabase(dbName)){
    dropDatabase(dbName)
}
db=database(dbName, HASH, [SYMBOL, 5])

if(existsTable(dbName, "sync_config"))
    db.dropTable("sync_config")
mtable=table(100:0, `connector_name`topic_name`target_db`target_tab, [SYMBOL,SYMBOL,SYMBOL,SYMBOL]);
db.createTable(table=mtable, tableName="sync_config")

2. 插入配置表信息,配置 MySQL 表?basicinfo.index_components?和?basicinfo.stock_basic?对应的 kafka 中 topic 名称对应的 DolphinDB 分布式表

sync_config=loadTable("dfs://ddb_sync_config","sync_config");
tmp_tab=table(100:0,`connector_name`topic_name`target_db`target_tab, [SYMBOL,SYMBOL,SYMBOL,SYMBOL]);
insert into tmp_tab (connector_name,topic_name,target_db,target_tab) values ("ddb-sink","mysqlserver.basicinfo.index_components","dfs://index_data","index_components");
insert into tmp_tab (connector_name,topic_name,target_db,target_tab) values ("ddb-sink","mysqlserver.basicinfo.stock_basic","dfs://wddb","stock_basic");
sync_config.append!(tmp_tab);

表中数据如下:

注意:对于同一个 connector_name,相同的?topic_name?只能配置一条数据。配置分布式库、表必须在 DolphinDB 书库中存在。

字段名类型字段作用
connector_nameSymbol配置的 DolphinDB sink 同步任务名
topic_nameSymbol要同步的 kafka topic 名称
target_dbSymbol对应的 DolphinDB 分布式库名
target_tabSymbol对应的 DolphinDB 分布式表名

配置 DolphinDB 的数据同步连接任务

准备连接器配置文件,并启动连接任务

创建 DolphinDB 数据同步任务配置文件

cd /KFDATA/datasyn-config
vim ddb-sink.json

配置如下:

{
    "name": "ddb-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "2",
        "topics": "mysqlserver.basicinfo.index_components,mysqlserver.basicinfo.stock_basic",
        "connection.url": "jdbc:dolphindb://192.168.189.130:8848?user=admin&password=123456",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
        "auto.evolve": "false",
        "insert.mode": "insert",
        "delete.enabled": "true",
        "batch.size":"10000",
        "pk.mode": "record_key",
        "ddbsync.config.table":"dfs://ddb_sync_config,sync_config"
    }
}

参数说明:以上参数项为同步 DolphinDB 所需参数。如果对 Confluent 的 JDBC Sink Connect 有经验,可适当调节。

参数名称默认值参数说明
name同步任务名称,不可重复。
connector.class连接器的 Java 类的名称。这里是 JdbcSink 的通用连接器类名。
tasks.max1当前 connector 的最大并行任务数。可以调节增大,会创建多 consumer 并行消费读取 Kafka 中数据。一般的数据同步场景设置到 10 基本可以满足同步速度上的需求。
topics配置要同步的 Kafka 中的 topic 名称,配置多个 topic 时用逗号分割。
connection.urlMySQL 数据库服务器的 IP 地址或主机名。
transforms声明数据转换操作。
transforms.unwrap.type声明数据转换器类别。请保持不变。
transforms.unwrap.drop.tombstonesfalse声明是否删除 Kafka 中的墓碑数据。
auto.evolvetrue当 DolphinDB 中缺少列时,是否自动增加列。当前不支持自动增加列,必须配置为 false。
insert.modeinsert数据插入模式。当前只支持 insert 模式。
pk.modenone主键模式。必须设置为 record_key。
delete.enabledfalse在主键模式为 record_key 情况下。对于 null 值 record 是否按照 delete 进行操作。
batch.size3000设置在数据量足够大时。以每批最大多少条来写入到目标数据库。注意:当该值大于 Connect worker 中设置的 consumer.max.pol.records 时,每次提交数量会受 consumer.max.pol.records 的值限制。
ddbsync.config.tabledfs://ddb_sync_config, sync_configKafka 中的 topic 对应 DolphinDB 表的配置表名称。可以自行定义库、表名称。但表中的字段要保持一致。表结构见“DolphinDB 的数据同步准备”。

通过 REST API 启动 source 连接器

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://192.168.189.130:8083/connectors/ -d @ddb-sink.json

也可以通过我们提供?kafka-tools?中的脚本启动

cd /KFDATA/kafka-tools/bin
./rest.sh create @/KFDATA/datasyn-config/ddb-sink.json

查看同步任务列表。其中,”ddb-sink” 为 DolphinDB 数据同步程序。

./rest.sh list

查看 DolphinDB 的 sink 同步任务状态

./rest.sh status ddb-sink

通过下图可以看到,同步到 DolphinDB 的同步任务包含 1 个 connector 和 2 个 task 。两个 task 状态都是 RUNNING,即正常运行。这里配置了两个线程进行数据消费,并写入 DolphinDB。

查看 DolphinDB 中的数据

select * from loadTable('dfs://index_data', 'index_components');
select * from loadTable('dfs://wddb', 'stock_basic')

数据分别如下,两张表的初始数据均已经同步到了 DolphinDB 中。

实时数据同步验证

第一步:插入新数据

在 MySQL 中插入两条新数据。

insert into basicinfo.index_components (trade_date,code,effdate,indexShortName,indexCode,secShortName,exchangeCD,weight,timestamp,flag)
values
('2006-11-30','600051','2018-06-30 05:48:05','上证180','000010','三一重工','XXXB',0.0043,'2018-06-30 05:48:05',1),
('2006-11-30','600052','2018-06-30 06:48:02','沪深300','000300','三一重工','XSHG',0.0029,'2018-06-30 05:48:05',1)

在 DolphinDB 中进行查询,可以看到已经多了两条 code 值为 600051 和 600052 的。

select * from loadTable('dfs://index_data', 'index_components');

第二步:数据更新

在 MySQL 中更新一条数据,这里我们做一个涉及主键字段的更新。

update basicinfo.index_components set code='600061' where code ='600051'

在 DolphinDB 中进行查询,发现表中已经不存在 code 值为 600051 的数据,但可以看到一条 code 值为 600061 的数据。

select * from loadTable('dfs://index_data', 'index_components');

第三步:数据删除

从 MySQL 中删除一条数据。

delete from basicinfo.index_components where code='600061'

在 DolphinDB 中进行查询,可以看到 code 值为 600061 的数据已经不存在了。

运维操作

DolphinDB 同步须知

  1. DolphinDB 是一款支持海量数据的分布式时序数据库。针对不同的数据处理需求,在底层架构上天然上与通常的关系型数据库不同。所以需要有以下限制:
  • DolphinDB 的表没有主键设计,需要设置成?sortColumn?字段,并设置?keepDuplicates?= LAST 来进行去重,确保数据唯一。
  • DolphinDB 表采用 TSDB 引擎,才可以设置?sortColumn
  • DolphinDB 中 TSDB 引擎的?sortColumn?中必须要有时间列或者数字列,对应的来源主键则必须包含同样类型字段。
  • DolphinDB 中 TSDB 引擎的?sortColumn?中必须要有至少两个字段,才能设置?keepDuplicates?= LAST,所以对应的来源表主键必须是 2 个字段及以上。

2. DDL 语句相关:

  • 当前不支持 DDL 语句同步。
  • 当前不支持同时修改两边表后的数据传递。

部署检查

  1. 查看当前服务是否都在运行状态
sudo systemctl list-units |egrep 'zookeeper|kafka-server|schema-registry|kafka-connect'

也可以使用 Jps 等其他方法快速查看 Java 进程。

2. 运行以下命令查看当前的同步任务列表查询

查看当前有哪些同步任务:

./rest.sh list

3. 查看某个同步任务的状态

./rest.sh status ddb-sink

4. 暂停同步任务,该操作会停止当前整体 connector 同步任务

./rest.sh c_pause ddb-sink

5. 恢复同步任务

./rest.sh c_resume ddb-sink

对于曾经由于数据库报错一度暂停的同步任务,在错误消除后,只要 connector 运行正常,可以通过以下命令使其恢复同步:

./rest.sh t_restart ${connector_name} ${task_id}

6. 修改同步任务配置参数

./rest c_alter ${connector_name} @source_config.json

修改参数时,只需传递参数,不需要带有 connector name,格式示例如下:

数据同步情况检查

正常情况下,数据同步程序会保持稳定的数据同步。对于意外因素造成的数据未同步,可参考以下步骤逐一排查:

  1. 查看 MySQL 中binlog中记录的最新位置。

查看该值需要正确的配置?gtid_mode?等参数,按照前面的提供的 MySQL 参数配置既可。

SHOW MASTER STATUS;

查看 MySQL 中的?binlog?具体数据库更改。 可以通过?mysqlbinglog?命令查看 MySQL 的?binlog?中记录的数据库改变。

./mysqlbinlog --base64-output=decode-rows -v --skip-gtids /usr/local/mysql/data/binlog.000003|less

2. 查看 Kafka 中记录的 MySQL 同步的?binlog?位置。

结合前面查看的 MySQL 最新?binlog?位置,可以确定当前数据从 MySQL 到 Kafka 的同步进度。

./consume.sh --topic connect-offsets --from-beginning |grep basicinfo-connector

查看 Kafka 中数据, Kafka 中的数据是已序列化的二进制存储。需要使用 avro 调用 schema-registry 中的表结构信息及进行反序列化。这里我们提供了?tpconsumer.sh?脚本,可以提供反序列化后的 Kafka 中的真实数据,并匹配上该条数据对应的表结构。

./tpconsumer.sh --op=2 --topic=mysqlserver.basicinfo.index_components --offset=1 --max-messages=2

3. 查看当前 DolphinDB 同步任务列表。

下面命令可以查看当前 Kafka 中的消费组。

./kafka.sh cm_list

查看 DolphinDB 同步任务对应的 Kafka 消费组中的每一个 consumer 的消费进度,通过此命令可以查看同步程序中每一张的表同步进度。 Lag 为 0 则表示 Kafka 中 topic 当前没有未消费的数据,即 Kafka 中的数据与对应表的数据是一致的。

./kafka.sh cm_detail connect-ddb-sink|awk '{printf "%-20s %-40s %-9s %-14s %-15s %-10s %-30s\n", $1, $2, $3, $4, $5, $6,$7}'

附录

KFDATA.tar?压缩包包含:数据的同步数据文件夹、配置文件及?Kafka-tools?脚本。

文章来源:https://blog.csdn.net/qq_41996852/article/details/135077684
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。