obbinlog flinkcdc 数据同步验证-c7电子娱乐
本文为「让技术被看见 | 2024 oceanbase 布道师计划」的参赛文章。诚邀更多的技术爱好者参与 oceanbase 技术征文,赢取万元大奖,和我们一起用文字让代码跳动起来!
前言
接到通知,生产的数据同步项目日后准备使用 最新的obbinlog工具 支撑flink数据同步,本文以 社区版 oceanbase binlog v4.0.1 版本进行测试验证
本文省略obbinlog集群搭建的步骤,具体步骤可以参考c7电子娱乐官网进行搭建。另外,感谢社区的川粉老师对于obbinlog疑难问题的解答
环境
ip | 角色 | 版本 |
---|---|---|
10.0.0.62 | obbinlog | 4.0.1 |
10.0.0.62 | flink | 1.18.1 |
10.0.0.65 | ob源端 | 4.2.1.8 |
flink安装
flink运行依赖java环境,需要提前在终端下载jdk
上传安装包解压
[root@observer062 opt]# ll
total 1641232
drwx--x--x 4 root root 28 aug 1 16:56 containerd
-rw-r--r-- 1 root root 481192147 dec 4 20:40 flink-1.18.1-bin-scala_2.12.tgz
-rw-r--r-- 1 root root 1199426304 oct 18 18:28 obbinlog-ce-4.0.1-1.el7.x86_64.rpm
[root@observer062 opt]#
[root@observer062 opt]#
[root@observer062 opt]#
[root@observer062 opt]# tar zxvf flink-1.18.1-bin-scala_2.12.tgz
flink-1.18.1/
flink-1.18.1/license
flink-1.18.1/bin/
flink-1.18.1/licenses/
flink-1.18.1/plugins/
flink-1.18.1/notice
flink-1.18.1/examples/
flink-1.18.1/lib/
flink-1.18.1/opt/
flink-1.18.1/log/
flink-1.18.1/readme.txt
flink-1.18.1/conf/
flink-1.18.1/conf/logback.xml
flink-1.18.1/conf/log4j-session.properties
flink-1.18.1/conf/logback-session.xml
flink-1.18.1/conf/flink-conf.yaml
flink-1.18.1/conf/log4j-cli.properties
flink-1.18.1/conf/logback-console.xml
flink-1.18.1/conf/zoo.cfg
flink-1.18.1/conf/workers
flink-1.18.1/conf/log4j-console.properties
flink-1.18.1/conf/masters
flink-1.18.1/conf/log4j.properties
flink-1.18.1/opt/flink-azure-fs-hadoop-1.18.1.jar
flink-1.18.1/opt/flink-queryable-state-runtime-1.18.1.jar
flink-1.18.1/opt/flink-table-planner_2.12-1.18.1.jar
flink-1.18.1/opt/python/
flink-1.18.1/opt/flink-s3-fs-presto-1.18.1.jar
flink-1.18.1/opt/flink-python-1.18.1.jar
flink-1.18.1/opt/flink-s3-fs-hadoop-1.18.1.jar
flink-1.18.1/opt/flink-oss-fs-hadoop-1.18.1.jar
flink-1.18.1/opt/flink-shaded-netty-tcnative-dynamic-2.0.59.final-17.0.jar
flink-1.18.1/opt/flink-sql-client-1.18.1.jar
flink-1.18.1/opt/flink-gs-fs-hadoop-1.18.1.jar
flink-1.18.1/opt/flink-cep-scala_2.12-1.18.1.jar
flink-1.18.1/opt/flink-state-processor-api-1.18.1.jar
flink-1.18.1/opt/flink-sql-gateway-1.18.1.jar
flink-1.18.1/opt/python/pyflink.zip
flink-1.18.1/opt/python/py4j-0.10.9.7-src.zip
flink-1.18.1/opt/python/cloudpickle-2.2.0-src.zip
flink-1.18.1/lib/flink-table-runtime-1.18.1.jar
flink-1.18.1/lib/log4j-api-2.17.1.jar
flink-1.18.1/lib/flink-connector-files-1.18.1.jar
flink-1.18.1/lib/flink-table-api-java-uber-1.18.1.jar
flink-1.18.1/lib/flink-scala_2.12-1.18.1.jar
flink-1.18.1/lib/log4j-1.2-api-2.17.1.jar
flink-1.18.1/lib/flink-cep-1.18.1.jar
flink-1.18.1/lib/log4j-core-2.17.1.jar
flink-1.18.1/lib/log4j-slf4j-impl-2.17.1.jar
flink-1.18.1/lib/flink-csv-1.18.1.jar
flink-1.18.1/lib/flink-table-planner-loader-1.18.1.jar
flink-1.18.1/lib/flink-json-1.18.1.jar
flink-1.18.1/lib/flink-dist-1.18.1.jar
flink-1.18.1/examples/python/
flink-1.18.1/examples/streaming/
flink-1.18.1/examples/table/
flink-1.18.1/examples/batch/
flink-1.18.1/examples/batch/webloganalysis.jar
flink-1.18.1/examples/batch/transitiveclosure.jar
flink-1.18.1/examples/batch/kmeans.jar
flink-1.18.1/examples/batch/connectedcomponents.jar
flink-1.18.1/examples/batch/enumtriangles.jar
flink-1.18.1/examples/batch/pagerank.jar
flink-1.18.1/examples/batch/distcp.jar
flink-1.18.1/examples/batch/wordcount.jar
flink-1.18.1/examples/table/gettingstartedexample.jar
flink-1.18.1/examples/table/streamsqlexample.jar
flink-1.18.1/examples/table/updatingtopcityexample.jar
flink-1.18.1/examples/table/advancedfunctionsexample.jar
flink-1.18.1/examples/table/streamwindowsqlexample.jar
flink-1.18.1/examples/table/wordcountsqlexample.jar
flink-1.18.1/examples/table/changelogsocketexample.jar
flink-1.18.1/examples/streaming/iteration.jar
flink-1.18.1/examples/streaming/topspeedwindowing.jar
flink-1.18.1/examples/streaming/windowjoin.jar
flink-1.18.1/examples/streaming/statemachineexample.jar
flink-1.18.1/examples/streaming/socketwindowwordcount.jar
flink-1.18.1/examples/streaming/wordcount.jar
flink-1.18.1/examples/streaming/sessionwindowing.jar
flink-1.18.1/examples/python/datastream/
flink-1.18.1/examples/python/table/
flink-1.18.1/examples/python/table/basic_operations.py
flink-1.18.1/examples/python/table/multi_sink.py
flink-1.18.1/examples/python/table/process_json_data_with_udf.py
flink-1.18.1/examples/python/table/process_json_data.py
flink-1.18.1/examples/python/table/windowing/
flink-1.18.1/examples/python/table/word_count.py
flink-1.18.1/examples/python/table/mixing_use_of_datastream_and_table.py
flink-1.18.1/examples/python/table/pandas/
flink-1.18.1/examples/python/table/streaming_word_count.py
flink-1.18.1/examples/python/table/pandas/pandas_udaf.py
flink-1.18.1/examples/python/table/pandas/conversion_from_dataframe.py
flink-1.18.1/examples/python/table/windowing/session_window.py
flink-1.18.1/examples/python/table/windowing/sliding_window.py
flink-1.18.1/examples/python/table/windowing/tumble_window.py
flink-1.18.1/examples/python/table/windowing/over_window.py
flink-1.18.1/examples/python/datastream/connectors/
flink-1.18.1/examples/python/datastream/basic_operations.py
flink-1.18.1/examples/python/datastream/process_json_data.py
flink-1.18.1/examples/python/datastream/windowing/
flink-1.18.1/examples/python/datastream/word_count.py
flink-1.18.1/examples/python/datastream/event_time_timer.py
flink-1.18.1/examples/python/datastream/streaming_word_count.py
flink-1.18.1/examples/python/datastream/state_access.py
flink-1.18.1/examples/python/datastream/windowing/session_with_dynamic_gap_window.py
flink-1.18.1/examples/python/datastream/windowing/tumbling_count_window.py
flink-1.18.1/examples/python/datastream/windowing/tumbling_time_window.py
flink-1.18.1/examples/python/datastream/windowing/sliding_time_window.py
flink-1.18.1/examples/python/datastream/windowing/session_with_gap_window.py
flink-1.18.1/examples/python/datastream/connectors/elasticsearch.py
flink-1.18.1/examples/python/datastream/connectors/kafka_avro_format.py
flink-1.18.1/examples/python/datastream/connectors/kafka_csv_format.py
flink-1.18.1/examples/python/datastream/connectors/kafka_json_format.py
flink-1.18.1/examples/python/datastream/connectors/pulsar.py
flink-1.18.1/plugins/metrics-statsd/
flink-1.18.1/plugins/metrics-prometheus/
flink-1.18.1/plugins/metrics-graphite/
flink-1.18.1/plugins/external-resource-gpu/
flink-1.18.1/plugins/metrics-jmx/
flink-1.18.1/plugins/metrics-influx/
flink-1.18.1/plugins/metrics-slf4j/
flink-1.18.1/plugins/readme.txt
flink-1.18.1/plugins/metrics-datadog/
flink-1.18.1/plugins/metrics-datadog/flink-metrics-datadog-1.18.1.jar
flink-1.18.1/plugins/metrics-slf4j/flink-metrics-slf4j-1.18.1.jar
flink-1.18.1/plugins/metrics-influx/flink-metrics-influxdb-1.18.1.jar
flink-1.18.1/plugins/metrics-jmx/flink-metrics-jmx-1.18.1.jar
flink-1.18.1/plugins/external-resource-gpu/gpu-discovery-common.sh
flink-1.18.1/plugins/external-resource-gpu/flink-external-resource-gpu-1.18.1.jar
flink-1.18.1/plugins/external-resource-gpu/nvidia-gpu-discovery.sh
flink-1.18.1/plugins/metrics-graphite/flink-metrics-graphite-1.18.1.jar
flink-1.18.1/plugins/metrics-prometheus/flink-metrics-prometheus-1.18.1.jar
flink-1.18.1/plugins/metrics-statsd/flink-metrics-statsd-1.18.1.jar
flink-1.18.1/licenses/license-aopalliance
flink-1.18.1/licenses/license.pyrolite
flink-1.18.1/licenses/license.threetenbp
flink-1.18.1/licenses/license.protobuf-java
flink-1.18.1/licenses/license.kryo
flink-1.18.1/licenses/license.jzlib
flink-1.18.1/licenses/license.protobuf.txt
flink-1.18.1/licenses/license.api-common
flink-1.18.1/licenses/license.slf4j-api
flink-1.18.1/licenses/license.antlr-runtime
flink-1.18.1/licenses/license.antlr-java-grammar-files
flink-1.18.1/licenses/license.automaton
flink-1.18.1/licenses/license.gax
flink-1.18.1/licenses/license.icu4j
flink-1.18.1/licenses/license.janino
flink-1.18.1/licenses/license.grizzled-slf4j
flink-1.18.1/licenses/license.jsr166y
flink-1.18.1/licenses/license.scala
flink-1.18.1/licenses/license.influx
flink-1.18.1/licenses/license.google-auth-library-oauth2-http
flink-1.18.1/licenses/license-re2j
flink-1.18.1/licenses/license.protobuf
flink-1.18.1/licenses/license-hdrhistogram
flink-1.18.1/licenses/license.jaxb
flink-1.18.1/licenses/license.javax.activation
flink-1.18.1/licenses/license.jdom
flink-1.18.1/licenses/license.google-auth-library-credentials
flink-1.18.1/licenses/license.asm
flink-1.18.1/licenses/license-stax2api
flink-1.18.1/licenses/license.py4j
flink-1.18.1/licenses/license.gax-httpjson
flink-1.18.1/licenses/license.protobuf-java-util
flink-1.18.1/licenses/license.minlog
flink-1.18.1/licenses/license.webbit
flink-1.18.1/licenses/license.base64
flink-1.18.1/licenses/license.jline
flink-1.18.1/bin/bash-java-utils.jar
flink-1.18.1/bin/stop-zookeeper-quorum.sh
flink-1.18.1/bin/historyserver.sh
flink-1.18.1/bin/flink-daemon.sh
flink-1.18.1/bin/flink
flink-1.18.1/bin/jobmanager.sh
flink-1.18.1/bin/stop-cluster.sh
flink-1.18.1/bin/sql-gateway.sh
flink-1.18.1/bin/pyflink-shell.sh
flink-1.18.1/bin/yarn-session.sh
flink-1.18.1/bin/standalone-job.sh
flink-1.18.1/bin/kubernetes-taskmanager.sh
flink-1.18.1/bin/kubernetes-jobmanager.sh
flink-1.18.1/bin/flink-console.sh
flink-1.18.1/bin/start-zookeeper-quorum.sh
flink-1.18.1/bin/sql-client.sh
flink-1.18.1/bin/config.sh
flink-1.18.1/bin/zookeeper.sh
flink-1.18.1/bin/kubernetes-session.sh
flink-1.18.1/bin/find-flink-home.sh
flink-1.18.1/bin/start-cluster.sh
flink-1.18.1/bin/taskmanager.sh
[root@observer062 opt]#
[root@observer062 opt]#
[root@observer062 opt]#
[root@observer062 opt]# ll
total 1641232
drwx--x--x 4 root root 28 aug 1 16:56 containerd
drwxr-xr-x 10 501 games 156 dec 20 2023 flink-1.18.1
-rw-r--r-- 1 root root 481192147 dec 4 20:40 flink-1.18.1-bin-scala_2.12.tgz
-rw-r--r-- 1 root root 1199426304 oct 18 18:28 obbinlog-ce-4.0.1-1.el7.x86_64.rpm
[root@observer062 opt]#
[root@observer062 opt]#
[root@observer062 opt]#
[root@observer062 opt]#
下载mysql-cdc驱动
进入flink安装路径的lib目录下,下载mysql-cdc驱动包
[root@observer062 flink-1.18.1]# cd lib
[root@observer062 lib]# ll
total 203824
-rw-r--r-- 1 501 games 196578 dec 20 2023 flink-cep-1.18.1.jar
-rw-r--r-- 1 501 games 554431 dec 20 2023 flink-connector-files-1.18.1.jar
-rw-r--r-- 1 501 games 102376 dec 20 2023 flink-csv-1.18.1.jar
-rw-r--r-- 1 501 games 127072434 dec 20 2023 flink-dist-1.18.1.jar
-rw-r--r-- 1 501 games 202901 dec 20 2023 flink-json-1.18.1.jar
-rw-r--r-- 1 501 games 21058485 dec 20 2023 flink-scala_2.12-1.18.1.jar
-rw-r--r-- 1 501 games 15527125 dec 20 2023 flink-table-api-java-uber-1.18.1.jar
-rw-r--r-- 1 501 games 38216715 dec 20 2023 flink-table-planner-loader-1.18.1.jar
-rw-r--r-- 1 501 games 3437157 dec 20 2023 flink-table-runtime-1.18.1.jar
-rw-r--r-- 1 501 games 208006 sep 23 2022 log4j-1.2-api-2.17.1.jar
-rw-r--r-- 1 501 games 301872 sep 23 2022 log4j-api-2.17.1.jar
-rw-r--r-- 1 501 games 1790452 sep 23 2022 log4j-core-2.17.1.jar
-rw-r--r-- 1 501 games 24279 sep 23 2022 log4j-slf4j-impl-2.17.1.jar
[root@observer062 lib]#
[root@observer062 lib]# wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.0/flink-sql-connector-mysql-cdc-2.4.0.jar
--2024-12-04 21:57:01-- https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.0/flink-sql-connector-mysql-cdc-2.4.0.jar
resolving repo1.maven.org (repo1.maven.org)... 199.232.196.209, 199.232.192.209, 2a04:4e42:4c::209, ...
connecting to repo1.maven.org (repo1.maven.org)|199.232.196.209|:443... connected.
http request sent, awaiting response... 200 ok
length: 23598157 (23m) [application/java-archive]
saving to: ‘flink-sql-connector-mysql-cdc-2.4.0.jar’
100%[=====================================================================================================>] 23,598,157 217kb/s in 3m 24s
2024-12-04 22:00:26 (113 kb/s) - ‘flink-sql-connector-mysql-cdc-2.4.0.jar’ saved [23598157/23598157]
[root@observer062 lib]# ll
total 226872
-rw-r--r-- 1 501 games 196578 dec 20 2023 flink-cep-1.18.1.jar
-rw-r--r-- 1 501 games 554431 dec 20 2023 flink-connector-files-1.18.1.jar
-rw-r--r-- 1 501 games 102376 dec 20 2023 flink-csv-1.18.1.jar
-rw-r--r-- 1 501 games 127072434 dec 20 2023 flink-dist-1.18.1.jar
-rw-r--r-- 1 501 games 202901 dec 20 2023 flink-json-1.18.1.jar
-rw-r--r-- 1 501 games 21058485 dec 20 2023 flink-scala_2.12-1.18.1.jar
-rw-r--r-- 1 root root 23598157 jun 25 2023 flink-sql-connector-mysql-cdc-2.4.0.jar
-rw-r--r-- 1 501 games 15527125 dec 20 2023 flink-table-api-java-uber-1.18.1.jar
-rw-r--r-- 1 501 games 38216715 dec 20 2023 flink-table-planner-loader-1.18.1.jar
-rw-r--r-- 1 501 games 3437157 dec 20 2023 flink-table-runtime-1.18.1.jar
-rw-r--r-- 1 501 games 208006 sep 23 2022 log4j-1.2-api-2.17.1.jar
-rw-r--r-- 1 501 games 301872 sep 23 2022 log4j-api-2.17.1.jar
-rw-r--r-- 1 501 games 1790452 sep 23 2022 log4j-core-2.17.1.jar
-rw-r--r-- 1 501 games 24279 sep 23 2022 log4j-slf4j-impl-2.17.1.jar
[root@observer062 lib]#
[root@observer062 lib]#
配置flink配置文件
修改flink-conf.yaml配置文件中的 ip控制访问参数,由于时测试环境,我这里设置的是0.0.0.0,生产环境按照实际ip设置
taskmanager.host: localhost
rest.bind-address: 0.0.0.0
启动flink
[root@observer062 flink-1.18.1]# bin/start-cluster.sh
starting cluster.
starting standalonesession daemon on host observer062.
starting taskexecutor daemon on host observer062.
[root@observer062 flink-1.18.1]#
[root@observer062 flink-1.18.1]#
启动客户端
flink sql 客户端启动成功
[root@observer062 flink-1.18.1]# ./bin/sql-client.sh embedded
▒▓██▓██▒
▓████▒▒█▓▒▓███▓▒
▓███▓░░ ▒▒▒▓██▒ ▒
░██▒ ▒▒▓▓█▓▓▒░ ▒████
██▒ ░▒▓███▒ ▒█▒█▒
░▓█ ███ ▓░▒██
▓█ ▒▒▒▒▒▓██▓░▒░▓▓█
█░ █ ▒▒░ ███▓▓█ ▒█▒▒▒
████░ ▒▓█▓ ██▒▒▒ ▓███▒
░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░
▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒
███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒
░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒
███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░
██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓
▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒
▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒
▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓█
██▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █
▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓
█▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓
██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓
▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒
██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒
▓█ ▒█▓ ░ █░ ▒█ █▓
█▓ ██ █░ ▓▓ ▒█▓▓▓▒█░
█▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒█
██ ▓█▓░ ▒ ░▒█▒██▒ ▓▓
▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██
░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓
░▓██▒ ▓░ ▒█▓█ ░░▒▒▒
▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░
______ _ _ _ _____ ____ _ _____ _ _ _ beta
| ____| (_) | | / ____|/ __ \| | / ____| (_) | |
| |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
| __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|
| | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
|_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
welcome! enter 'help;' to list all available commands. 'quit;' to exit.
command history file path: /root/.flink-sql-history
flink sql>
登录flink网站
可以通过flink对应的web界面 登录,表示flink安装成功
创建binlog实例
登录binlog sevrer 创建实例
这里以10.0.0.65 的 metadb 集群中的 binlogtest 租户 作为 binlog实例,创建的时候指定 replicate num为2,为binlogtest创建了两个binlog实例,运行过程中,其中一个binlog实例作为容灾实例,确保高可用。 c7电子娱乐官网解释 一个 oceanbase 数据库 binlog 实例可类比于一个 mysql 实例,同一租户的多个 oceanbase 数据库 binlog 实例可类比于 mysql 主备实例
详细介绍可参见c7电子娱乐官网
[root@observer062 obbinlog]# mysql -h127.0.0.1 -p2983
welcome to the mysql monitor. commands end with ; or \g.
your mysql connection id is 1
server version: 5.7.35-obs
c7电子娱乐 copyright (c) 2000, 2024, oracle and/or its affiliates.
oracle is a registered trademark of oracle corporation and/or its
affiliates. other names may be trademarks of their respective
owners.
type 'help;' or '\h' for help. type '\c' to clear the current input statement.
mysql>
mysql>
mysql>
mysql>
mysql> create binlog for tenant `metadb`.`binlogtest`
-> to user `root` password `aaaa11__`
-> with cluster url `http://10.0.0.65:8080/services?action=obrootserviceinfo&user_id=alibaba&uid=ocpmaster&obregion=metadb`
-> , replicate num 2;
query ok, 0 rows affected (0.14 sec)
mysql>
mysql>
mysql> show binlog instances;
------------ ------------ ------------ ----------- ------ ------ -------- ------- --------- --------- --------------- ------------- -------------- ----------------- --------------- ------------- ------------- -------------- --------- ------------------------------------------------ ----------
| name | ob_cluster | ob_tenant | ip | port | zone | region | group | running | state | obcdc_running | obcdc_state | service_mode | convert_running | convert_delay | convert_rps | convert_eps | convert_iops | dumpers | version | odp_addr |
------------ ------------ ------------ ----------- ------ ------ -------- ------- --------- --------- --------------- ------------- -------------- ----------------- --------------- ------------- ------------- -------------- --------- ------------------------------------------------ ----------
| bkyjce1f6q | metadb | binlogtest | 10.0.0.62 | 8101 | | | | yes | running | yes | running | enabled | no | 0 | 0 | 0 | 0 | 0 | 4.0.1-92748c6c59534426d187977b13b61b7cdf4fa200 | null |
| ujmbpx85wl | metadb | binlogtest | 10.0.0.62 | 8102 | | | | yes | running | yes | running | enabled | no | 0 | 0 | 0 | 0 | 0 | 4.0.1-92748c6c59534426d187977b13b61b7cdf4fa200 | null |
------------ ------------ ------------ ----------- ------ ------ -------- ------- --------- --------- --------------- ------------- -------------- ----------------- --------------- ------------- ------------- -------------- --------- ------------------------------------------------ ----------
2 rows in set (0.02 sec)
配置 odp
将源端 10.0.0.65 的metadb集群的 binlog_service_ip 配置为 binlog server 的ip
注意
如果希望 binlog 服务执行 show create table 时的返回结果完全兼容 mysql 语法(即去除 oceanbase 数据库的扩展语法),可以设置 set _show_ddl_in_compat_mode = 1
odp_ce v4.2.3 及之后版本已经移除 enable_binlog_service 配置项,如果报错显示该配置项不存在 可以忽略。
详细信息可参考官方文档
[root@server065 ~]# obclient -h10.0.0.65 -p2883 -uroot@sys#metadb -paaaa11__ -a oceanbase
welcome to the oceanbase. commands end with ; or \g.
your oceanbase connection id is 743981964
server version: oceanbase_ce 4.2.1.8 (r108000022024072217-3149c25ca2dadbb7707686ad02a1367b1b43e0b5) (built jul 23 2024 02:01:58)
c7电子娱乐 copyright (c) 2000, 2018, oceanbase and/or its affiliates. all rights reserved.
type 'help;' or '\h' for help. type '\c' to clear the current input statement.
root@oceanbase>alter proxyconfig set binlog_service_ip='10.0.0.62:2983';
query ok, 0 rows affected (0.009 sec)
root@oceanbase>alter proxyconfig set enable_binlog_service='true';
error 5099 (42000): system config unknown
root@oceanbase>
root@oceanbase>alter proxyconfig set init_sql='set _show_ddl_in_compat_mode = 1;';
query ok, 0 rows affected (0.041 sec)
root@oceanbase>show proxyconfig like 'binlog_service_ip';
------------------- ---------------- ------------------------------------------------------------------------------------------------------------------------- ------------- --------------- ------- --------------
| name | value | info | need_reboot | visible_level | range | config_level |
------------------- ---------------- ------------------------------------------------------------------------------------------------------------------------- ------------- --------------- ------- --------------
| binlog_service_ip | 10.0.0.62:2983 | binlog service ip/hostname list, format ip1:sql_port1;hostname2:sql_port2, separate with ';'. prefer to use the forward | false | sys | | level_vip |
------------------- ---------------- ------------------------------------------------------------------------------------------------------------------------- ------------- --------------- ------- --------------
1 row in set (0.008 sec)
root@oceanbase>
查看binlog位点
登录binlogtest租户 执行 show master status 可以看到binlog位点信息,binlog实例创建成功
[root@server065 software]# obclient -h10.0.0.65 -p2883 -uroot@binlogtest#metadb -paaaa11__ -a oceanbase
welcome to the oceanbase. commands end with ; or \g.
your oceanbase connection id is 744006597
server version: oceanbase_ce 4.2.1.8 (r108000022024072217-3149c25ca2dadbb7707686ad02a1367b1b43e0b5) (built jul 23 2024 02:01:58)
c7电子娱乐 copyright (c) 2000, 2018, oceanbase and/or its affiliates. all rights reserved.
type 'help;' or '\h' for help. type '\c' to clear the current input statement.
root@oceanbase>
root@oceanbase>
root@oceanbase>show matser status;
error 1064 (42000): you have an error in your sql syntax; check the manual that corresponds to your oceanbase version for the right syntax to use near 'matser status' at line 1
root@oceanbase>
root@oceanbase>show master status;
------------------ ---------- -------------- ------------------ -------------------
| file | position | binlog_do_db | binlog_ignore_db | executed_gtid_set |
------------------ ---------- -------------- ------------------ -------------------
| mysql-bin.000001 | 155 | | | |
------------------ ---------- -------------- ------------------ -------------------
1 row in set (0.204 sec)
root@oceanbase>
root@oceanbase>
数据同步
ob实例租户创建业务表test_cdc
root@oceanbase>
root@oceanbase>use test;
database changed
root@test>
root@test>
root@test>create table `test_cdc` (
-> `id` int not null auto_increment,
-> `name` varchar(255) default null,
-> primary key (`id`)
-> )
-> ;
query ok, 0 rows affected (0.111 sec)
root@test>
root@test>show tables;
----------------
| tables_in_test |
----------------
| test_cdc |
----------------
1 row in set (0.004 sec)
root@test>
flink sql 创建对应同步表
注意:
binlog 服务就是为了完全兼容mysql,同步表应该按照 mysql-cdc 建表语法创建
mysql cdc 建表的with 参数中的 地址 端口 填obproxy对应的地址端口信息,用户名和密码填创建 binlog 的租户对应的信息
flink sql> create table test_flink_cdc (
> id int,
> name string,
> primary key(id) not enforced
> ) with (
> 'connector' = 'mysql-cdc',
> 'hostname' = '10.0.0.65',
> 'port' = '2883',
> 'username'='root@binlogtest#metadb',
> 'password'='aaaa11__',
> 'database-name'='test',
> 'table-name'='test_cdc'
> );
>
[info] execute statement succeed.
flink sql>
由于ob源表test_cdc中还没有数据,此时进入 flink sql 查看同步表没有数据,处于等待状态中
ob源表插入数据
此时向ob源表test_cdc插入数据
root@test>insert into test_cdc values (001, 'test01');
query ok, 1 row affected (0.012 sec)
root@test>insert into test_cdc values (002, 'test02');
query ok, 1 row affected (0.003 sec)
root@test>insert into test_cdc values (003, 'test03');
query ok, 1 row affected (0.003 sec)
root@test>insert into test_cdc values (004, 'test04');
query ok, 1 row affected (0.003 sec)
root@test>insert into test_cdc values (005, 'test05');
query ok, 1 row affected (0.003 sec)
root@test>insert into test_cdc values (006, 'test06');
query ok, 1 row affected (0.003 sec)
root@test>
flink 同步表 接受到 新增数据
此时通过 flink sql 客户端 查看同步表接收到 新增数据,同步成功
flink ul界面可以看到刚刚创建的flink sql 接收binlog 的 job ,running表示运行正常
ob源表更新数据
root@test>update test_cdc set name='testbinlog' where name='test06';
query ok, 1 row affected (0.005 sec)
rows matched: 1 changed: 1 warnings: 0
root@test>
flink 同步表 接受到 更新数据
此时通过 flink sql 客户端 查看同步表接收到 更新数据,同步成功
binlog实例高可用
释放一个binlog实例
[root@observer062 ~]# mysql -h127.0.0.1 -p2983
welcome to the mysql monitor. commands end with ; or \g.
your mysql connection id is 39
server version: 5.7.35-obs
c7电子娱乐 copyright (c) 2000, 2024, oracle and/or its affiliates.
oracle is a registered trademark of oracle corporation and/or its
affiliates. other names may be trademarks of their respective
owners.
type 'help;' or '\h' for help. type '\c' to clear the current input statement.
mysql>
mysql>
mysql> show binlog instances;
------------ ------------ ------------ ----------- ------ ------ -------- ------- --------- --------- --------------- ------------- -------------- ----------------- --------------- ------------- ------------- -------------- --------- ------------------------------------------------ ----------
| name | ob_cluster | ob_tenant | ip | port | zone | region | group | running | state | obcdc_running | obcdc_state | service_mode | convert_running | convert_delay | convert_rps | convert_eps | convert_iops | dumpers | version | odp_addr |
------------ ------------ ------------ ----------- ------ ------ -------- ------- --------- --------- --------------- ------------- -------------- ----------------- --------------- ------------- ------------- -------------- --------- ------------------------------------------------ ----------
| ihiclibe34 | metadb | binlogtest | 10.0.0.62 | 8102 | | | | yes | running | yes | running | enabled | yes | 408 | 1 | 0 | 0 | 1 | 4.0.1-92748c6c59534426d187977b13b61b7cdf4fa200 | null |
| xebn9ehbo2 | metadb | binlogtest | 10.0.0.62 | 8103 | | | | yes | running | yes | running | enabled | yes | 268 | 1 | 0 | 0 | 0 | 4.0.1-92748c6c59534426d187977b13b61b7cdf4fa200 | null |
------------ ------------ ------------ ----------- ------ ------ -------- ------- --------- --------- --------------- ------------- -------------- ----------------- --------------- ------------- ------------- -------------- --------- ------------------------------------------------ ----------
2 rows in set (0.21 sec)
mysql>
mysql>
mysql>
mysql>
mysql> drop binlog instance ihiclibe34 force;
query ok, 0 rows affected (0.01 sec)
mysql> show binlog instances;
------------ ------------ ------------ ----------- ------ ------ -------- ------- --------- --------- --------------- ------------- -------------- ----------------- --------------- ------------- ------------- -------------- --------- ------------------------------------------------ ----------
| name | ob_cluster | ob_tenant | ip | port | zone | region | group | running | state | obcdc_running | obcdc_state | service_mode | convert_running | convert_delay | convert_rps | convert_eps | convert_iops | dumpers | version | odp_addr |
------------ ------------ ------------ ----------- ------ ------ -------- ------- --------- --------- --------------- ------------- -------------- ----------------- --------------- ------------- ------------- -------------- --------- ------------------------------------------------ ----------
| xebn9ehbo2 | metadb | binlogtest | 10.0.0.62 | 8103 | | | | yes | running | yes | running | enabled | yes | 460 | 1 | 0 | 0 | 0 | 4.0.1-92748c6c59534426d187977b13b61b7cdf4fa200 | null |
------------ ------------ ------------ ----------- ------ ------ -------- ------- --------- --------- --------------- ------------- -------------- ----------------- --------------- ------------- ------------- -------------- --------- ------------------------------------------------ ----------
1 row in set (0.01 sec)
mysql>
验证数据同步
删除一个binlog实例后,向源表中继续插入新增数据
[root@server065 ~]# obclient -h10.0.0.65 -p2883 -uroot@binlogtest#metadb -paaaa11__ -a oceanbase
welcome to the oceanbase. commands end with ; or \g.
your oceanbase connection id is 743998788
server version: oceanbase_ce 4.2.1.8 (r108000022024072217-3149c25ca2dadbb7707686ad02a1367b1b43e0b5) (built jul 23 2024 02:01:58)
c7电子娱乐 copyright (c) 2000, 2018, oceanbase and/or its affiliates. all rights reserved.
type 'help;' or '\h' for help. type '\c' to clear the current input statement.
root@oceanbase>use test;
database changed
root@test>
root@test>
root@test>
root@test>insert into test_cdc values (007, 'test07');
query ok, 1 row affected (0.002 sec)
root@test>insert into test_cdc values (008, 'test08');
query ok, 1 row affected (0.002 sec)
root@test>insert into test_cdc values (009, 'test09');
query ok, 1 row affected (0.002 sec)
root@test>
登录flink cdc 客户端,select查看同步表 发现 源表新增的数据被同步到下游flink cdc 中,证明binlog多实例的高可用性
登录 flink web 界面,查看同步任务依然正常运行
总结
1.binlog 服务就是为了完全兼容mysql,所以在 flink 中创建对应的同步表时 应该按照 flink mysql cdc 建表语法创建
2.mysql cdc 建表的with 参数中的 地址 端口 填obproxy对应的地址端口信息,用户名和密码填创建 binlog 的租户对应的信息
3.创建binlog实例时,建议指定 replicate num 参数创建多个实例,确保binlog实例的高可用性