8
博客obbinlog flinkcdc 数据同步验证

obbinlog flinkcdc 数据同步验证-c7电子娱乐

2024-12-05用户实践
本文为「让技术被看见 | 2024 oceanbase 布道师计划」的参赛文章。诚邀更多的技术爱好者参与 oceanbase 技术征文,赢取万元大奖,和我们一起用文字让代码跳动起来!

前言

接到通知,生产的数据同步项目日后准备使用 最新的obbinlog工具 支撑flink数据同步,本文以 社区版 oceanbase binlog v4.0.1 版本进行测试验证

本文省略obbinlog集群搭建的步骤,具体步骤可以参考c7电子娱乐官网进行搭建。另外,感谢社区的川粉老师对于obbinlog疑难问题的解答

环境

ip角色版本
10.0.0.62obbinlog4.0.1
10.0.0.62flink1.18.1
10.0.0.65ob源端4.2.1.8

flink安装

flink运行依赖java环境,需要提前在终端下载jdk

上传安装包解压


[root@observer062 opt]# ll
total 1641232
drwx--x--x 4 root root         28 aug  1 16:56 containerd
-rw-r--r-- 1 root root  481192147 dec  4 20:40 flink-1.18.1-bin-scala_2.12.tgz
-rw-r--r-- 1 root root 1199426304 oct 18 18:28 obbinlog-ce-4.0.1-1.el7.x86_64.rpm
[root@observer062 opt]#
[root@observer062 opt]#
[root@observer062 opt]#
[root@observer062 opt]# tar zxvf flink-1.18.1-bin-scala_2.12.tgz
flink-1.18.1/
flink-1.18.1/license
flink-1.18.1/bin/
flink-1.18.1/licenses/
flink-1.18.1/plugins/
flink-1.18.1/notice
flink-1.18.1/examples/
flink-1.18.1/lib/
flink-1.18.1/opt/
flink-1.18.1/log/
flink-1.18.1/readme.txt
flink-1.18.1/conf/
flink-1.18.1/conf/logback.xml
flink-1.18.1/conf/log4j-session.properties
flink-1.18.1/conf/logback-session.xml
flink-1.18.1/conf/flink-conf.yaml
flink-1.18.1/conf/log4j-cli.properties
flink-1.18.1/conf/logback-console.xml
flink-1.18.1/conf/zoo.cfg
flink-1.18.1/conf/workers
flink-1.18.1/conf/log4j-console.properties
flink-1.18.1/conf/masters
flink-1.18.1/conf/log4j.properties
flink-1.18.1/opt/flink-azure-fs-hadoop-1.18.1.jar
flink-1.18.1/opt/flink-queryable-state-runtime-1.18.1.jar
flink-1.18.1/opt/flink-table-planner_2.12-1.18.1.jar
flink-1.18.1/opt/python/
flink-1.18.1/opt/flink-s3-fs-presto-1.18.1.jar
flink-1.18.1/opt/flink-python-1.18.1.jar
flink-1.18.1/opt/flink-s3-fs-hadoop-1.18.1.jar
flink-1.18.1/opt/flink-oss-fs-hadoop-1.18.1.jar
flink-1.18.1/opt/flink-shaded-netty-tcnative-dynamic-2.0.59.final-17.0.jar
flink-1.18.1/opt/flink-sql-client-1.18.1.jar
flink-1.18.1/opt/flink-gs-fs-hadoop-1.18.1.jar
flink-1.18.1/opt/flink-cep-scala_2.12-1.18.1.jar
flink-1.18.1/opt/flink-state-processor-api-1.18.1.jar
flink-1.18.1/opt/flink-sql-gateway-1.18.1.jar
flink-1.18.1/opt/python/pyflink.zip
flink-1.18.1/opt/python/py4j-0.10.9.7-src.zip
flink-1.18.1/opt/python/cloudpickle-2.2.0-src.zip
flink-1.18.1/lib/flink-table-runtime-1.18.1.jar
flink-1.18.1/lib/log4j-api-2.17.1.jar
flink-1.18.1/lib/flink-connector-files-1.18.1.jar
flink-1.18.1/lib/flink-table-api-java-uber-1.18.1.jar
flink-1.18.1/lib/flink-scala_2.12-1.18.1.jar
flink-1.18.1/lib/log4j-1.2-api-2.17.1.jar
flink-1.18.1/lib/flink-cep-1.18.1.jar
flink-1.18.1/lib/log4j-core-2.17.1.jar
flink-1.18.1/lib/log4j-slf4j-impl-2.17.1.jar
flink-1.18.1/lib/flink-csv-1.18.1.jar
flink-1.18.1/lib/flink-table-planner-loader-1.18.1.jar
flink-1.18.1/lib/flink-json-1.18.1.jar
flink-1.18.1/lib/flink-dist-1.18.1.jar
flink-1.18.1/examples/python/
flink-1.18.1/examples/streaming/
flink-1.18.1/examples/table/
flink-1.18.1/examples/batch/
flink-1.18.1/examples/batch/webloganalysis.jar
flink-1.18.1/examples/batch/transitiveclosure.jar
flink-1.18.1/examples/batch/kmeans.jar
flink-1.18.1/examples/batch/connectedcomponents.jar
flink-1.18.1/examples/batch/enumtriangles.jar
flink-1.18.1/examples/batch/pagerank.jar
flink-1.18.1/examples/batch/distcp.jar
flink-1.18.1/examples/batch/wordcount.jar
flink-1.18.1/examples/table/gettingstartedexample.jar
flink-1.18.1/examples/table/streamsqlexample.jar
flink-1.18.1/examples/table/updatingtopcityexample.jar
flink-1.18.1/examples/table/advancedfunctionsexample.jar
flink-1.18.1/examples/table/streamwindowsqlexample.jar
flink-1.18.1/examples/table/wordcountsqlexample.jar
flink-1.18.1/examples/table/changelogsocketexample.jar
flink-1.18.1/examples/streaming/iteration.jar
flink-1.18.1/examples/streaming/topspeedwindowing.jar
flink-1.18.1/examples/streaming/windowjoin.jar
flink-1.18.1/examples/streaming/statemachineexample.jar
flink-1.18.1/examples/streaming/socketwindowwordcount.jar
flink-1.18.1/examples/streaming/wordcount.jar
flink-1.18.1/examples/streaming/sessionwindowing.jar
flink-1.18.1/examples/python/datastream/
flink-1.18.1/examples/python/table/
flink-1.18.1/examples/python/table/basic_operations.py
flink-1.18.1/examples/python/table/multi_sink.py
flink-1.18.1/examples/python/table/process_json_data_with_udf.py
flink-1.18.1/examples/python/table/process_json_data.py
flink-1.18.1/examples/python/table/windowing/
flink-1.18.1/examples/python/table/word_count.py
flink-1.18.1/examples/python/table/mixing_use_of_datastream_and_table.py
flink-1.18.1/examples/python/table/pandas/
flink-1.18.1/examples/python/table/streaming_word_count.py
flink-1.18.1/examples/python/table/pandas/pandas_udaf.py
flink-1.18.1/examples/python/table/pandas/conversion_from_dataframe.py
flink-1.18.1/examples/python/table/windowing/session_window.py
flink-1.18.1/examples/python/table/windowing/sliding_window.py
flink-1.18.1/examples/python/table/windowing/tumble_window.py
flink-1.18.1/examples/python/table/windowing/over_window.py
flink-1.18.1/examples/python/datastream/connectors/
flink-1.18.1/examples/python/datastream/basic_operations.py
flink-1.18.1/examples/python/datastream/process_json_data.py
flink-1.18.1/examples/python/datastream/windowing/
flink-1.18.1/examples/python/datastream/word_count.py
flink-1.18.1/examples/python/datastream/event_time_timer.py
flink-1.18.1/examples/python/datastream/streaming_word_count.py
flink-1.18.1/examples/python/datastream/state_access.py
flink-1.18.1/examples/python/datastream/windowing/session_with_dynamic_gap_window.py
flink-1.18.1/examples/python/datastream/windowing/tumbling_count_window.py
flink-1.18.1/examples/python/datastream/windowing/tumbling_time_window.py
flink-1.18.1/examples/python/datastream/windowing/sliding_time_window.py
flink-1.18.1/examples/python/datastream/windowing/session_with_gap_window.py
flink-1.18.1/examples/python/datastream/connectors/elasticsearch.py
flink-1.18.1/examples/python/datastream/connectors/kafka_avro_format.py
flink-1.18.1/examples/python/datastream/connectors/kafka_csv_format.py
flink-1.18.1/examples/python/datastream/connectors/kafka_json_format.py
flink-1.18.1/examples/python/datastream/connectors/pulsar.py
flink-1.18.1/plugins/metrics-statsd/
flink-1.18.1/plugins/metrics-prometheus/
flink-1.18.1/plugins/metrics-graphite/
flink-1.18.1/plugins/external-resource-gpu/
flink-1.18.1/plugins/metrics-jmx/
flink-1.18.1/plugins/metrics-influx/
flink-1.18.1/plugins/metrics-slf4j/
flink-1.18.1/plugins/readme.txt
flink-1.18.1/plugins/metrics-datadog/
flink-1.18.1/plugins/metrics-datadog/flink-metrics-datadog-1.18.1.jar
flink-1.18.1/plugins/metrics-slf4j/flink-metrics-slf4j-1.18.1.jar
flink-1.18.1/plugins/metrics-influx/flink-metrics-influxdb-1.18.1.jar
flink-1.18.1/plugins/metrics-jmx/flink-metrics-jmx-1.18.1.jar
flink-1.18.1/plugins/external-resource-gpu/gpu-discovery-common.sh
flink-1.18.1/plugins/external-resource-gpu/flink-external-resource-gpu-1.18.1.jar
flink-1.18.1/plugins/external-resource-gpu/nvidia-gpu-discovery.sh
flink-1.18.1/plugins/metrics-graphite/flink-metrics-graphite-1.18.1.jar
flink-1.18.1/plugins/metrics-prometheus/flink-metrics-prometheus-1.18.1.jar
flink-1.18.1/plugins/metrics-statsd/flink-metrics-statsd-1.18.1.jar
flink-1.18.1/licenses/license-aopalliance
flink-1.18.1/licenses/license.pyrolite
flink-1.18.1/licenses/license.threetenbp
flink-1.18.1/licenses/license.protobuf-java
flink-1.18.1/licenses/license.kryo
flink-1.18.1/licenses/license.jzlib
flink-1.18.1/licenses/license.protobuf.txt
flink-1.18.1/licenses/license.api-common
flink-1.18.1/licenses/license.slf4j-api
flink-1.18.1/licenses/license.antlr-runtime
flink-1.18.1/licenses/license.antlr-java-grammar-files
flink-1.18.1/licenses/license.automaton
flink-1.18.1/licenses/license.gax
flink-1.18.1/licenses/license.icu4j
flink-1.18.1/licenses/license.janino
flink-1.18.1/licenses/license.grizzled-slf4j
flink-1.18.1/licenses/license.jsr166y
flink-1.18.1/licenses/license.scala
flink-1.18.1/licenses/license.influx
flink-1.18.1/licenses/license.google-auth-library-oauth2-http
flink-1.18.1/licenses/license-re2j
flink-1.18.1/licenses/license.protobuf
flink-1.18.1/licenses/license-hdrhistogram
flink-1.18.1/licenses/license.jaxb
flink-1.18.1/licenses/license.javax.activation
flink-1.18.1/licenses/license.jdom
flink-1.18.1/licenses/license.google-auth-library-credentials
flink-1.18.1/licenses/license.asm
flink-1.18.1/licenses/license-stax2api
flink-1.18.1/licenses/license.py4j
flink-1.18.1/licenses/license.gax-httpjson
flink-1.18.1/licenses/license.protobuf-java-util
flink-1.18.1/licenses/license.minlog
flink-1.18.1/licenses/license.webbit
flink-1.18.1/licenses/license.base64
flink-1.18.1/licenses/license.jline
flink-1.18.1/bin/bash-java-utils.jar
flink-1.18.1/bin/stop-zookeeper-quorum.sh
flink-1.18.1/bin/historyserver.sh
flink-1.18.1/bin/flink-daemon.sh
flink-1.18.1/bin/flink
flink-1.18.1/bin/jobmanager.sh
flink-1.18.1/bin/stop-cluster.sh
flink-1.18.1/bin/sql-gateway.sh
flink-1.18.1/bin/pyflink-shell.sh
flink-1.18.1/bin/yarn-session.sh
flink-1.18.1/bin/standalone-job.sh
flink-1.18.1/bin/kubernetes-taskmanager.sh
flink-1.18.1/bin/kubernetes-jobmanager.sh
flink-1.18.1/bin/flink-console.sh
flink-1.18.1/bin/start-zookeeper-quorum.sh
flink-1.18.1/bin/sql-client.sh
flink-1.18.1/bin/config.sh
flink-1.18.1/bin/zookeeper.sh
flink-1.18.1/bin/kubernetes-session.sh
flink-1.18.1/bin/find-flink-home.sh
flink-1.18.1/bin/start-cluster.sh
flink-1.18.1/bin/taskmanager.sh
[root@observer062 opt]#
[root@observer062 opt]#
[root@observer062 opt]#
[root@observer062 opt]# ll
total 1641232
drwx--x--x  4 root root          28 aug  1 16:56 containerd
drwxr-xr-x 10  501 games        156 dec 20  2023 flink-1.18.1
-rw-r--r--  1 root root   481192147 dec  4 20:40 flink-1.18.1-bin-scala_2.12.tgz
-rw-r--r--  1 root root  1199426304 oct 18 18:28 obbinlog-ce-4.0.1-1.el7.x86_64.rpm
[root@observer062 opt]#
[root@observer062 opt]#
[root@observer062 opt]#
[root@observer062 opt]#


下载mysql-cdc驱动

进入flink安装路径的lib目录下,下载mysql-cdc驱动包

[root@observer062 flink-1.18.1]# cd lib
[root@observer062 lib]# ll
total 203824
-rw-r--r-- 1 501 games    196578 dec 20  2023 flink-cep-1.18.1.jar
-rw-r--r-- 1 501 games    554431 dec 20  2023 flink-connector-files-1.18.1.jar
-rw-r--r-- 1 501 games    102376 dec 20  2023 flink-csv-1.18.1.jar
-rw-r--r-- 1 501 games 127072434 dec 20  2023 flink-dist-1.18.1.jar
-rw-r--r-- 1 501 games    202901 dec 20  2023 flink-json-1.18.1.jar
-rw-r--r-- 1 501 games  21058485 dec 20  2023 flink-scala_2.12-1.18.1.jar
-rw-r--r-- 1 501 games  15527125 dec 20  2023 flink-table-api-java-uber-1.18.1.jar
-rw-r--r-- 1 501 games  38216715 dec 20  2023 flink-table-planner-loader-1.18.1.jar
-rw-r--r-- 1 501 games   3437157 dec 20  2023 flink-table-runtime-1.18.1.jar
-rw-r--r-- 1 501 games    208006 sep 23  2022 log4j-1.2-api-2.17.1.jar
-rw-r--r-- 1 501 games    301872 sep 23  2022 log4j-api-2.17.1.jar
-rw-r--r-- 1 501 games   1790452 sep 23  2022 log4j-core-2.17.1.jar
-rw-r--r-- 1 501 games     24279 sep 23  2022 log4j-slf4j-impl-2.17.1.jar
[root@observer062 lib]#
[root@observer062 lib]# wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.0/flink-sql-connector-mysql-cdc-2.4.0.jar
--2024-12-04 21:57:01--  https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.0/flink-sql-connector-mysql-cdc-2.4.0.jar
resolving repo1.maven.org (repo1.maven.org)... 199.232.196.209, 199.232.192.209, 2a04:4e42:4c::209, ...
connecting to repo1.maven.org (repo1.maven.org)|199.232.196.209|:443... connected.
http request sent, awaiting response... 200 ok
length: 23598157 (23m) [application/java-archive]
saving to: ‘flink-sql-connector-mysql-cdc-2.4.0.jar’
100%[=====================================================================================================>] 23,598,157   217kb/s   in 3m 24s
2024-12-04 22:00:26 (113 kb/s) - ‘flink-sql-connector-mysql-cdc-2.4.0.jar’ saved [23598157/23598157]
[root@observer062 lib]# ll
total 226872
-rw-r--r-- 1  501 games    196578 dec 20  2023 flink-cep-1.18.1.jar
-rw-r--r-- 1  501 games    554431 dec 20  2023 flink-connector-files-1.18.1.jar
-rw-r--r-- 1  501 games    102376 dec 20  2023 flink-csv-1.18.1.jar
-rw-r--r-- 1  501 games 127072434 dec 20  2023 flink-dist-1.18.1.jar
-rw-r--r-- 1  501 games    202901 dec 20  2023 flink-json-1.18.1.jar
-rw-r--r-- 1  501 games  21058485 dec 20  2023 flink-scala_2.12-1.18.1.jar
-rw-r--r-- 1 root root   23598157 jun 25  2023 flink-sql-connector-mysql-cdc-2.4.0.jar
-rw-r--r-- 1  501 games  15527125 dec 20  2023 flink-table-api-java-uber-1.18.1.jar
-rw-r--r-- 1  501 games  38216715 dec 20  2023 flink-table-planner-loader-1.18.1.jar
-rw-r--r-- 1  501 games   3437157 dec 20  2023 flink-table-runtime-1.18.1.jar
-rw-r--r-- 1  501 games    208006 sep 23  2022 log4j-1.2-api-2.17.1.jar
-rw-r--r-- 1  501 games    301872 sep 23  2022 log4j-api-2.17.1.jar
-rw-r--r-- 1  501 games   1790452 sep 23  2022 log4j-core-2.17.1.jar
-rw-r--r-- 1  501 games     24279 sep 23  2022 log4j-slf4j-impl-2.17.1.jar
[root@observer062 lib]#
[root@observer062 lib]#


配置flink配置文件

修改flink-conf.yaml配置文件中的 ip控制访问参数,由于时测试环境,我这里设置的是0.0.0.0,生产环境按照实际ip设置

taskmanager.host: localhost
rest.bind-address: 0.0.0.0



[root@observer062 flink-1.18.1]# bin/start-cluster.sh
starting cluster.
starting standalonesession daemon on host observer062.
starting taskexecutor daemon on host observer062.
[root@observer062 flink-1.18.1]#
[root@observer062 flink-1.18.1]#


启动客户端

flink sql 客户端启动成功

[root@observer062 flink-1.18.1]# ./bin/sql-client.sh embedded
                                   ▒▓██▓██▒
                               ▓████▒▒█▓▒▓███▓▒
                            ▓███▓░░        ▒▒▒▓██▒  ▒
                          ░██▒   ▒▒▓▓█▓▓▒░      ▒████
                          ██▒         ░▒▓███▒    ▒█▒█▒
                            ░▓█            ███   ▓░▒██
                              ▓█       ▒▒▒▒▒▓██▓░▒░▓▓█
                            █░ █   ▒▒░       ███▓▓█ ▒█▒▒▒
                            ████░   ▒▓█▓      ██▒▒▒ ▓███▒
                         ░▒█▓▓██       ▓█▒    ▓█▒▓██▓ ░█░
                   ▓░▒▓████▒ ██         ▒█    █▓░▒█▒░▒█▒
                  ███▓░██▓  ▓█           █   █▓ ▒▓█▓▓█▒
                ░██▓  ░█░            █  █▒ ▒█████▓▒ ██▓░▒
               ███░ ░ █░          ▓ ░█ █████▒░░    ░█░▓  ▓░
              ██▓█ ▒▒▓▒          ▓███████▓░       ▒█▒ ▒▓ ▓██▓
           ▒██▓ ▓█ █▓█       ░▒█████▓▓▒░         ██▒▒  █ ▒  ▓█▒
           ▓█▓  ▓█ ██▓ ░▓▓▓▓▓▓▓▒              ▒██▓           ░█▒
           ▓█    █ ▓███▓▒░              ░▓▓▓███▓          ░▒░ ▓█
           ██▓    ██▒    ░▒▓▓███▓▓▓▓▓██████▓▒            ▓███  █
          ▓███▒ ███   ░▓▓▒░░   ░▓████▓░                  ░▒▓▒  █▓
          █▓▒▒▓▓██  ░▒▒░░░▒▒▒▒▓██▓░                            █▓
          ██ ▓░▒█   ▓▓▓▓▒░░  ▒█▓       ▒▓▓██▓    ▓▒          ▒▒▓
          ▓█▓ ▓▒█  █▓░  ░▒▓▓██▒            ░▓█▒   ▒▒▒░▒▒▓█████▒
           ██░ ▓█▒█▒  ▒▓▓▒  ▓█                █░      ░░░░   ░█▒
           ▓█   ▒█▓   ░     █░                ▒█              █▓
            █▓   ██         █░                 ▓▓        ▒█▓▓▓▒█░
             █▓ ░▓██░       ▓▒                  ▓█▓▒░░░▒▓█░    ▒█
              ██   ▓█▓░      ▒                    ░▒█▒██▒      ▓▓
               ▓█▒   ▒█▓▒░                         ▒▒ █▒█▓▒▒░░▒██
                ░██▒    ▒▓▓▒                     ▓██▓▒█▒ ░▓▓▓▓▒█▓
                  ░▓██▒                          ▓░  ▒█▓█  ░░▒▒▒
                      ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓  ▓░▒█░
    ______ _ _       _       _____  ____  _         _____ _ _            _  beta
   |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |
   | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_
   |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|
   | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_
   |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|
        welcome! enter 'help;' to list all available commands. 'quit;' to exit.
command history file path: /root/.flink-sql-history
flink sql>


登录flink网站

可以通过flink对应的web界面    登录,表示flink安装成功

1733373436


创建binlog实例

登录binlog sevrer 创建实例

这里以10.0.0.65 的 metadb 集群中的 binlogtest 租户 作为 binlog实例,创建的时候指定 replicate num为2,为binlogtest创建了两个binlog实例,运行过程中,其中一个binlog实例作为容灾实例,确保高可用。 c7电子娱乐官网解释 一个 oceanbase 数据库 binlog 实例可类比于一个 mysql 实例,同一租户的多个 oceanbase 数据库 binlog 实例可类比于 mysql 主备实例

详细介绍可参见c7电子娱乐官网


[root@observer062 obbinlog]# mysql -h127.0.0.1 -p2983
welcome to the mysql monitor.  commands end with ; or \g.
your mysql connection id is 1
server version: 5.7.35-obs
c7电子娱乐 copyright (c) 2000, 2024, oracle and/or its affiliates.
oracle is a registered trademark of oracle corporation and/or its
affiliates. other names may be trademarks of their respective
owners.
type 'help;' or '\h' for help. type '\c' to clear the current input statement.
mysql>
mysql>
mysql>
mysql>
mysql> create binlog for tenant `metadb`.`binlogtest`
    ->        to user `root` password `aaaa11__`
    ->        with cluster url `http://10.0.0.65:8080/services?action=obrootserviceinfo&user_id=alibaba&uid=ocpmaster&obregion=metadb`
    ->        , replicate num 2;
query ok, 0 rows affected (0.14 sec)
mysql>
mysql>
mysql> show binlog instances;
 ------------ ------------ ------------ ----------- ------ ------ -------- ------- --------- --------- --------------- ------------- -------------- ----------------- --------------- ------------- ------------- -------------- --------- ------------------------------------------------ ---------- 
| name       | ob_cluster | ob_tenant  | ip        | port | zone | region | group | running | state   | obcdc_running | obcdc_state | service_mode | convert_running | convert_delay | convert_rps | convert_eps | convert_iops | dumpers | version                                        | odp_addr |
 ------------ ------------ ------------ ----------- ------ ------ -------- ------- --------- --------- --------------- ------------- -------------- ----------------- --------------- ------------- ------------- -------------- --------- ------------------------------------------------ ---------- 
| bkyjce1f6q | metadb     | binlogtest | 10.0.0.62 | 8101 |      |        |       | yes     | running | yes           | running     | enabled      | no              |             0 |           0 |           0 |            0 |       0 | 4.0.1-92748c6c59534426d187977b13b61b7cdf4fa200 | null     |
| ujmbpx85wl | metadb     | binlogtest | 10.0.0.62 | 8102 |      |        |       | yes     | running | yes           | running     | enabled      | no              |             0 |           0 |           0 |            0 |       0 | 4.0.1-92748c6c59534426d187977b13b61b7cdf4fa200 | null     |
 ------------ ------------ ------------ ----------- ------ ------ -------- ------- --------- --------- --------------- ------------- -------------- ----------------- --------------- ------------- ------------- -------------- --------- ------------------------------------------------ ---------- 
2 rows in set (0.02 sec)



配置 odp

将源端 10.0.0.65 的metadb集群的 binlog_service_ip 配置为 binlog server 的ip

注意

如果希望 binlog 服务执行 show create table 时的返回结果完全兼容 mysql 语法(即去除 oceanbase 数据库的扩展语法),可以设置 set _show_ddl_in_compat_mode = 1

odp_ce v4.2.3 及之后版本已经移除 enable_binlog_service 配置项,如果报错显示该配置项不存在 可以忽略。

详细信息可参考官方文档

[root@server065 ~]# obclient -h10.0.0.65 -p2883 -uroot@sys#metadb -paaaa11__ -a oceanbase
welcome to the oceanbase.  commands end with ; or \g.
your oceanbase connection id is 743981964
server version: oceanbase_ce 4.2.1.8 (r108000022024072217-3149c25ca2dadbb7707686ad02a1367b1b43e0b5) (built jul 23 2024 02:01:58)
c7电子娱乐 copyright (c) 2000, 2018, oceanbase and/or its affiliates. all rights reserved.
type 'help;' or '\h' for help. type '\c' to clear the current input statement.
root@oceanbase>alter proxyconfig set binlog_service_ip='10.0.0.62:2983';
query ok, 0 rows affected (0.009 sec)
root@oceanbase>alter proxyconfig set enable_binlog_service='true';
error 5099 (42000): system config unknown
root@oceanbase>
root@oceanbase>alter proxyconfig set init_sql='set _show_ddl_in_compat_mode = 1;';
query ok, 0 rows affected (0.041 sec)
root@oceanbase>show proxyconfig like 'binlog_service_ip';
 ------------------- ---------------- ------------------------------------------------------------------------------------------------------------------------- ------------- --------------- ------- -------------- 
| name              | value          | info                                                                                                                    | need_reboot | visible_level | range | config_level |
 ------------------- ---------------- ------------------------------------------------------------------------------------------------------------------------- ------------- --------------- ------- -------------- 
| binlog_service_ip | 10.0.0.62:2983 | binlog service ip/hostname list, format ip1:sql_port1;hostname2:sql_port2, separate with ';'. prefer to use the forward | false       | sys           |       | level_vip    |
 ------------------- ---------------- ------------------------------------------------------------------------------------------------------------------------- ------------- --------------- ------- -------------- 
1 row in set (0.008 sec)
root@oceanbase>



查看binlog位点

登录binlogtest租户 执行 show master status 可以看到binlog位点信息,binlog实例创建成功

[root@server065 software]# obclient -h10.0.0.65 -p2883 -uroot@binlogtest#metadb -paaaa11__ -a oceanbase
welcome to the oceanbase.  commands end with ; or \g.
your oceanbase connection id is 744006597
server version: oceanbase_ce 4.2.1.8 (r108000022024072217-3149c25ca2dadbb7707686ad02a1367b1b43e0b5) (built jul 23 2024 02:01:58)
c7电子娱乐 copyright (c) 2000, 2018, oceanbase and/or its affiliates. all rights reserved.
type 'help;' or '\h' for help. type '\c' to clear the current input statement.
root@oceanbase>
root@oceanbase>
root@oceanbase>show matser status;
error 1064 (42000): you have an error in your sql syntax; check the manual that corresponds to your oceanbase version for the right syntax to use near 'matser status' at line 1
root@oceanbase>
root@oceanbase>show master status;
 ------------------ ---------- -------------- ------------------ ------------------- 
| file             | position | binlog_do_db | binlog_ignore_db | executed_gtid_set |
 ------------------ ---------- -------------- ------------------ ------------------- 
| mysql-bin.000001 |      155 |              |                  |                   |
 ------------------ ---------- -------------- ------------------ ------------------- 
1 row in set (0.204 sec)
root@oceanbase>
root@oceanbase>



数据同步

ob实例租户创建业务表test_cdc

root@oceanbase>
root@oceanbase>use test;
database changed
root@test>
root@test>
root@test>create table `test_cdc` (
    ->       `id` int not null auto_increment,
    ->        `name` varchar(255) default null,
    ->        primary key (`id`)
    ->     )
    -> ;
query ok, 0 rows affected (0.111 sec)
root@test>
root@test>show tables;
 ---------------- 
| tables_in_test |
 ---------------- 
| test_cdc       |
 ---------------- 
1 row in set (0.004 sec)
root@test>



注意:

binlog 服务就是为了完全兼容mysql,同步表应该按照 mysql-cdc 建表语法创建

mysql cdc 建表的with 参数中的 地址 端口 填obproxy对应的地址端口信息,用户名和密码填创建 binlog 的租户对应的信息


flink sql> create table test_flink_cdc (
>   id int,
>   name string,
>   primary key(id)  not enforced
> ) with (
>   'connector' = 'mysql-cdc',
>   'hostname' = '10.0.0.65',
>   'port' = '2883',
>   'username'='root@binlogtest#metadb',
>   'password'='aaaa11__',
>   'database-name'='test',
>   'table-name'='test_cdc'
> );
>
[info] execute statement succeed.
flink sql>



由于ob源表test_cdc中还没有数据,此时进入 flink sql 查看同步表没有数据,处于等待状态中

1733374264


ob源表插入数据

此时向ob源表test_cdc插入数据


root@test>insert into test_cdc values (001, 'test01');
query ok, 1 row affected (0.012 sec)
root@test>insert into test_cdc values (002, 'test02');
query ok, 1 row affected (0.003 sec)
root@test>insert into test_cdc values (003, 'test03');
query ok, 1 row affected (0.003 sec)
root@test>insert into test_cdc values (004, 'test04');
query ok, 1 row affected (0.003 sec)
root@test>insert into test_cdc values (005, 'test05');
query ok, 1 row affected (0.003 sec)
root@test>insert into test_cdc values (006, 'test06');
query ok, 1 row affected (0.003 sec)
root@test>


此时通过 flink sql 客户端 查看同步表接收到 新增数据,同步成功

1733374390


flink ul界面可以看到刚刚创建的flink sql 接收binlog 的 job ,running表示运行正常

1733374432



ob源表更新数据

root@test>update test_cdc set name='testbinlog' where name='test06';
query ok, 1 row affected (0.005 sec)
rows matched: 1  changed: 1  warnings: 0
root@test>

此时通过 flink sql 客户端 查看同步表接收到 更新数据,同步成功

1733374521


binlog实例高可用

释放一个binlog实例

[root@observer062 ~]# mysql -h127.0.0.1 -p2983
welcome to the mysql monitor.  commands end with ; or \g.
your mysql connection id is 39
server version: 5.7.35-obs
c7电子娱乐 copyright (c) 2000, 2024, oracle and/or its affiliates.
oracle is a registered trademark of oracle corporation and/or its
affiliates. other names may be trademarks of their respective
owners.
type 'help;' or '\h' for help. type '\c' to clear the current input statement.
mysql>
mysql>
mysql> show binlog instances;
 ------------ ------------ ------------ ----------- ------ ------ -------- ------- --------- --------- --------------- ------------- -------------- ----------------- --------------- ------------- ------------- -------------- --------- ------------------------------------------------ ---------- 
| name       | ob_cluster | ob_tenant  | ip        | port | zone | region | group | running | state   | obcdc_running | obcdc_state | service_mode | convert_running | convert_delay | convert_rps | convert_eps | convert_iops | dumpers | version                                        | odp_addr |
 ------------ ------------ ------------ ----------- ------ ------ -------- ------- --------- --------- --------------- ------------- -------------- ----------------- --------------- ------------- ------------- -------------- --------- ------------------------------------------------ ---------- 
| ihiclibe34 | metadb     | binlogtest | 10.0.0.62 | 8102 |      |        |       | yes     | running | yes           | running     | enabled      | yes             |           408 |           1 |           0 |            0 |       1 | 4.0.1-92748c6c59534426d187977b13b61b7cdf4fa200 | null     |
| xebn9ehbo2 | metadb     | binlogtest | 10.0.0.62 | 8103 |      |        |       | yes     | running | yes           | running     | enabled      | yes             |           268 |           1 |           0 |            0 |       0 | 4.0.1-92748c6c59534426d187977b13b61b7cdf4fa200 | null     |
 ------------ ------------ ------------ ----------- ------ ------ -------- ------- --------- --------- --------------- ------------- -------------- ----------------- --------------- ------------- ------------- -------------- --------- ------------------------------------------------ ---------- 
2 rows in set (0.21 sec)
mysql>
mysql>
mysql>
mysql>
mysql> drop binlog instance ihiclibe34 force;
query ok, 0 rows affected (0.01 sec)
mysql> show binlog instances;
 ------------ ------------ ------------ ----------- ------ ------ -------- ------- --------- --------- --------------- ------------- -------------- ----------------- --------------- ------------- ------------- -------------- --------- ------------------------------------------------ ---------- 
| name       | ob_cluster | ob_tenant  | ip        | port | zone | region | group | running | state   | obcdc_running | obcdc_state | service_mode | convert_running | convert_delay | convert_rps | convert_eps | convert_iops | dumpers | version                                        | odp_addr |
 ------------ ------------ ------------ ----------- ------ ------ -------- ------- --------- --------- --------------- ------------- -------------- ----------------- --------------- ------------- ------------- -------------- --------- ------------------------------------------------ ---------- 
| xebn9ehbo2 | metadb     | binlogtest | 10.0.0.62 | 8103 |      |        |       | yes     | running | yes           | running     | enabled      | yes             |           460 |           1 |           0 |            0 |       0 | 4.0.1-92748c6c59534426d187977b13b61b7cdf4fa200 | null     |
 ------------ ------------ ------------ ----------- ------ ------ -------- ------- --------- --------- --------------- ------------- -------------- ----------------- --------------- ------------- ------------- -------------- --------- ------------------------------------------------ ---------- 
1 row in set (0.01 sec)
mysql>


验证数据同步

删除一个binlog实例后,向源表中继续插入新增数据

[root@server065 ~]# obclient -h10.0.0.65 -p2883 -uroot@binlogtest#metadb -paaaa11__ -a oceanbase
welcome to the oceanbase.  commands end with ; or \g.
your oceanbase connection id is 743998788
server version: oceanbase_ce 4.2.1.8 (r108000022024072217-3149c25ca2dadbb7707686ad02a1367b1b43e0b5) (built jul 23 2024 02:01:58)
c7电子娱乐 copyright (c) 2000, 2018, oceanbase and/or its affiliates. all rights reserved.
type 'help;' or '\h' for help. type '\c' to clear the current input statement.
root@oceanbase>use test;
database changed
root@test>
root@test>
root@test>
root@test>insert into test_cdc values (007, 'test07');
query ok, 1 row affected (0.002 sec)
root@test>insert into test_cdc values (008, 'test08');
query ok, 1 row affected (0.002 sec)
root@test>insert into test_cdc values (009, 'test09');
query ok, 1 row affected (0.002 sec)
root@test>


登录flink cdc 客户端,select查看同步表 发现 源表新增的数据被同步到下游flink cdc 中,证明binlog多实例的高可用性

1733374670


登录 flink web 界面,查看同步任务依然正常运行

1733374711



总结

1.binlog 服务就是为了完全兼容mysql,所以在 flink 中创建对应的同步表时 应该按照 flink mysql cdc 建表语法创建

2.mysql cdc 建表的with 参数中的 地址 端口 填obproxy对应的地址端口信息,用户名和密码填创建 binlog 的租户对应的信息

3.创建binlog实例时,建议指定 replicate num 参数创建多个实例,确保binlog实例的高可用性

点赞8
收藏

声明

本网站下的“博客”等板块为技术爱好者提供分享、交流的平台。发布者发布的任何内容、信息等,并不反映或代表本网站的观点、立场或政策。本网站不对其任何内容和信息的错误以及由此产生的损失或损坏承担任何责任。

尊重知识产权是本网站的基本原则之一,如您在使用本网站过程中发现本网站中存在侵犯您或其他第三人合法知识产权的情况,请您即可将侵权材料及初步证据提交至下述邮箱:obcompliance@oceanbase.com 。本网站将在收到材料后尽快进行审核及处理。

l

已发布 5 篇博文

网站地图