网站备案收费,方法seo,在上海做兼职在哪个网站好,招工招聘人在附近接上文#xff1a;一文说清flink从编码到部署上线 环境#xff1a;①操作系统#xff1a;阿里龙蜥 7.9#xff08;平替CentOS7.9#xff09;#xff1b;②CPU#xff1a;x86#xff1b;③用户#xff1a;root。 预研初衷#xff1a;现在很多项目有国产化的要求#… 接上文一文说清flink从编码到部署上线 环境①操作系统阿里龙蜥 7.9平替CentOS7.9②CPUx86③用户root。 预研初衷现在很多项目有国产化的要求操作系统、数据库需要国产化然后就想着找既能开源免费又能很好的兼容MySQL还能很好支持flink。然后就在信创目录找到OceanBase数据库。 flink探索flink CDC 找到这个文章Flink CDC 配置 OceanBase 实战指南官网论坛感觉比较靠谱然而发现按照说明引入依赖后相关语法是不支持的。也在网上找了比较多的其它资料中间比较坎坷都未解决不再赘述。最后转换思路既然OceanBase支持MySQL binlog那就把OceanBase当MySQL用使用MySQL CDC是不是可以最后问题得到解决。下面展开说明。 1.OceanBase部署
1.1 obd 部署
官方文档oceanbase部署
注意①这个地方最好选择obd 图形化部署docker部署虽然简单但是后续安装obbinlog会比较麻烦。②操作系统不要使用CentOS了好多yum源不能用了。可以使用“阿里龙蜥 7.9”。
部署完记得保存相关账号信息供参考
[{component: oceanbase-ce,access_url: 10.86.97.168:2881,user: root,password: pwd,connect_url: obclient -h10.86.97.168 -P2881 -uroot -ppwd -Doceanbase -A},{component: obproxy-ce,access_url: 10.86.97.168:2883,user: rootproxysys,password: Y6.B4s)pt,connect_url: obclient -h10.86.97.168 -P2883 -urootproxysys -pY6.B4s)pt -Doceanbase -A \n},{component: ocp-express,access_url: 10.86.97.168:8180,user: admin,password: DSxF-{odkdX-bmL6fjrF2{3mLL,connect_url: http://10.86.97.168:8180}
]这个“ocp-express”是个监控页面能看到集群信息访问“http://10.86.97.168:8180”
1.2 常用命令
启动obd cluster start myoceanbase改成具体集群名称常用命令
# 查看集群列表
obd cluster list
# 查看集群状态以部署名为 obtest 为例
obd cluster display obtest
# 停止运行中的集群以部署名为 obtest 为例
obd cluster stop obtest
# 销毁已部署的集群以部署名为 obtest 为例
obd cluster destroy obtest2.obbinlog
2.1 部署
官方文档obbinlog部署
部署过程中会遇到这个错误“https://mirrors.oceanbase.com/community/stable/el/7.9/x86_64/repodata/repomd.xml: [Errno 14] HTTPS Error 404 - Not Found”
解决方法修改“/etc/yum.repos.d/OceanBase.repo”中“$releasever”改为“7”。 解决完上面这个错误其它地方就比较顺利了。
查看是否安装成功
netstat -anp | grep 29832.2 创建租户
由于“不可以用rootsys创建binlog任务”所以要创建租户。
1.查看所有的租户信息
SELECT * FROM oceanbase.DBA_OB_TENANTS;2.查看resource pool
SELECT * FROM oceanbase.DBA_OB_RESOURCE_POOLS;3.创建“资源规格UNIT”
CREATE RESOURCE UNIT S1_unit_flink_testMEMORY_SIZE 2G,MAX_CPU 1, MIN_CPU 1,LOG_DISK_SIZE 6G,MAX_IOPS 10000, MIN_IOPS 10000, IOPS_WEIGHT1;4.创建resource pool仅 sys 租户的 root 用户rootsys可以创建资源池其他租户不支持创建资源池
-- sys_unit_config大概2GB内存。
CREATE RESOURCE POOL tenant_pool_flink_test UNITsys_unit_config, UNIT_NUM1, ZONE_LIST(zone1);5.创建租户创建一个名为 flink_test_tenant 的租户(默认为 MySQL 模式租户)副本数为1资源池指定为 flink_test_tenant_pool_01Primary Zone 为 zone1允许所有 IP 连接数据库。
CREATE TENANT IF NOT EXISTS flink_test_tenant PRIMARY_ZONEzone1, RESOURCE_POOL_LIST(tenant_pool_flink_test) set OB_TCP_INVITED_NODES%;6.使用新创建的租户管理员登录
用户名rootflink_test_tenant
密码默认为空有需要可以自己设置密码7.创建用户 CREATE USER 权限较大默认仅集群管理员和租户管理员拥有此系统权限
CREATE USER test IDENTIFIED BY pwd;
GRANT ALL ON *.* TO test;8.常用命令
其它命令删除用户
drop user test;
删除“资源规格”
DROP RESOURCE UNIT S1_unit_flink_test;
查询已有的“资源规格”信息
SELECT * FROM oceanbase.DBA_OB_UNIT_CONFIGS;2.3 创建数据库
账号testflink_test_tenant 密码pwd。 使用上面账号登录oceanbase后创建数据库。
CREATE DATABASE IF NOT EXISTS flink_test;
USE flink_test;SET FOREIGN_KEY_CHECKS0;-- ----------------------------
-- Table structure for rv_table
-- ----------------------------
DROP TABLE IF EXISTS rv_table;
CREATE TABLE rv_table (dt varchar(10) NOT NULL ,uuid varchar(30) DEFAULT NULL,report_time bigint(20) DEFAULT NULL,PRIMARY KEY (dt)
) ENGINEInnoDB DEFAULT CHARSETutf8mb4;2024-12-25 uid20241225 1735090201740
2024-12-26 uid20241226 17350902017412.4 创建binlog
进入binlog server服务
cd /home/oceanbase-all-in-one/obclient/u01/obclient/bin
obclient -h127.0.0.1 -P2983创建binlog
CREATE BINLOG FOR TENANT myoceanbase.flink_test_tenant TO USER root PASSWORD pwd WITH CLUSTER URL http://10.86.97.168:8080/services?ActionObRootServiceInfoObClustermyoceanbase,REPLICATE NUM 2;2.5 配置ODP
账号密码见安装完成保存的账号信息。
obclient -h10.86.97.168 -P2883 -urootproxysys -pY6.B4s)pt -Doceanbase -A
ALTER proxyconfig SET binlog_service_ip10.86.97.168:2983;2.6 验证结果
obclient -h10.86.97.168 -P2883 -urootflink_test_tenant -p -Doceanbase -A
默认密码为空到输密码时直接回车就行。SHOW MASTER STATUS;SHOW BINLOG EVENTS;2.7 常见问题
问题描述CREATE BINLOG 报 “ERROR 1236 (HY000): Internal error”
查看日志“[error] mysql_connecton_wrapper.cpp(121): Failed to execute query, error: (conn3221748588) Table ‘flink_test.instances_gtid_seq’ doesn’t exist”提示没有binlog相关表。
日志路径/home/ds/oblogproxy/log/logproxy.log
解决重新执行“sudo sh env/deploy.sh -m deploy -f env/deploy.conf.json” 相关的数据表会重建。然后再执行“CREATE BINLOG”即可。
或者说应该先创建数据库再安装obbinlog组件。安装后会在数据库创建binlog相关数据库表如下
3. fink CDC
3.1 核心代码
package com.zl.oceanbase;import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.zl.utils.EnvUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
import java.util.List;/*** 就当成MySQL使用就行。*/
public class OceanBaseCDCLikeMySQLExample {public static void main(String[] args) throws Exception {ListString SYNC_TABLES Arrays.asList(flink_test.rv_table);MySqlSourceString mySqlSource MySqlSource.Stringbuilder().hostname(10.86.97.168).port(2883)// oceanbase安装时obproxy-ce组件端口.databaseList(flink_test).tableList(String.join(,, SYNC_TABLES)).username(rootflink_test_tenant).password()// 记得修改为实际密码.startupOptions(StartupOptions.initial()).deserializer(new JsonDebeziumDeserializationSchema()).build();// 配置运行环境并行度1StreamExecutionEnvironment env EnvUtil.setFlinkEnv(1);// 程序间隔离每个程序单独设置env.getCheckpointConfig().setCheckpointStorage(hdfs://10.86.97.191:9000/flinktest/OceanBaseCDCLikeMySQLExample);// 如果不能正常读取mysql的binlog①可能是mysql没有打开binlog或者mysql版本不支持当前在mysql5.7.20环境下功能正常// ②可能是数据库ip、port、账号、密码错误。env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), MySQL Source).setParallelism(1).print();env.execute(Print MySQL Snapshot Binlog);}
}3.2 flink web 3.3 控制台日志 3.4 完整代码
完成代码见flink-cdc-mysql
4.扩展
本文主要基于oceanbase oblogproxy的binlog模式。 其实oblogproxy还支持CDC模式详见官网文档CDC模式。