8. Flink-CDC

news/2025/2/22 23:42:31

1. Flink-CDC的介绍

Flink-cdc主要是用来同步数据库中的数据,它的主要优势在于基于Flink框架直接用Flink Stream Api 或Flink SQL 直接编程,不需要引入第三方组件

2.Flink-CDC的使用

Flink-cdc在使用上需要注意的点

  • 注意Flink-cdc在2.1版本之前需要导入MySQL的连接包,之后版本不需要,如果环境中有MySQL的连接包需要去除掉
  • 在2.4版本之监控MySQL表需要它有主键,2.4版本开始只需要配置“scan.incremental.snapshot.chunk.key-column”参数即可
  • MySQL CDC Connector在监控多个表的时候,每个表需要指定库名,并用逗号隔开
  • Flink中必须要设置checkpoint,不设置无法正常监控binlog变更日志
    Flink-CDC基于DataStream的使用
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
        .hostname("node2")      //设置MySQL hostname
        .port(3306)             //设置MySQL port
        .databaseList("db1")    //设置捕获的数据库
        .tableList("db1.tbl1,db1.tbl2") //设置捕获的数据表
        .username("root")       //设置登录MySQL用户名
        .password("123456")     //设置登录MySQL密码
        .deserializer(new JsonDebeziumDeserializationSchema()) //设置序列化将SourceRecord 转换成 Json 字符串
        .startupOptions(StartupOptions.initial())
        .build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//开启checkpoint
env.enableCheckpointing(5000);
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),"MySQL Source")
        .setParallelism(4)
        .print();
env.execute();

基于Flink Sql的使用

EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
//设置checkpoint
tableEnv.getConfig().getConfiguration().setLong("execution.checkpointing.interval", 5000L);
tableEnv.executeSql("" +
        "CREATE TABLE mysql_binlog (" +
        " id INT," +
        " name STRING," +
        " age INT," +
        " PRIMARY KEY(id) NOT ENFORCED" +
        ") WITH (" +
        " 'connector' = 'mysql-cdc'," +
        " 'hostname' = 'node2'," +
        " 'port' = '3306'," +
        " 'username' = 'root'," +
        " 'password' = '123456'," +
        " 'database-name' = 'db1'," +
        " 'table-name' = 'tbl1'" +
        ")");
tableEnv.executeSql("select * from mysql_binlog").print();

2.1 Flink-CDC对全量和增量数据的工作原理

并行读取表的全量快照,然后以单并行度方式读取表的binlog进行增量数据的同步

  • 全量同步过程中,它会根据主键把数据分为多个chunk分片,然后分配给多并行度去分别读取这些chunk上的数据,读取快照期间,Flink支持chunk级别的checkpoint,即使在同步的过程中发生故障,也可以做到exactly-once级别的恢复

2.2 Flink-CDC启动模式

启动模式是指程序启动的时候,以怎么的方式监控数据库中的数据,共有如下几种模式

  • initial(默认): 对受监控的库表进行初始快照,并继续读取最新的binlog
  • earliest-offset: 它会跳过快照直接读取最早的binlog日志,它与initial方式区别在于,initial只读取已经操作后(表中现有数据)的数据
  • latest-offset: 不执行快照,从binlog的最新处开始读取增量数据
  • specific-offset: 从指定的binlog位点开始读取,位点可以通过binlog文件名和位置指定
  • timestamp: 从指定的时间戳读取binlog事件

http://www.niftyadmin.cn/n/5862849.html

相关文章

跟着李沐老师学习深度学习(十三)

现代循环神经网络 循环神经网络中梯度异常在实践中的意义引发了一些问题&#xff1a; 早期观测值影响重大&#xff1a;早期观测值对预测所有未来观测值极为重要&#xff0c;如序列中第一个观测值包含校验和&#xff0c;需在序列末尾辨别其是否正确&#xff0c;若无特殊机制存…

sysaux表空间处理流程

1.查看节点1表空间情况 set line 200; set pagesize 20000; set feedback off; col tablespace_name for a20; col c_free_percent for a12; col c_used_percent for a12; col m_free_percent for a12; col m_USED_PERCENT for a12; select d.tablespace_name,round(d…

React 高阶组件的优缺点

React 高阶组件的优缺点 优点 1. 代码复用性高 公共逻辑封装&#xff1a;当多个组件需要实现相同的功能或逻辑时&#xff0c;高阶组件可以将这些逻辑封装起来&#xff0c;避免代码重复。例如&#xff0c;多个组件都需要在挂载时进行数据获取操作&#xff0c;就可以创建一个数…

前端如何转战鸿蒙

前端如何转战鸿蒙系统 在当今技术日新月异的时代&#xff0c;前端开发者们不断探索新的领域和机会。随着鸿蒙系统的崛起&#xff0c;一个全新的生态正等待着前端开发者们去开拓。那么&#xff0c;作为前端开发者&#xff0c;我们为何要转战鸿蒙系统&#xff1f;又该如何顺利转型…

前端面试之Box盒子布局:核心知识与实战解析

目录 引言&#xff1a;布局能力决定前端高度 一、盒模型基础&#xff1a;看得见的像素战争 1. 标准盒模型 vs IE盒模型 2. 核心组成公式 3. 视觉格式化模型 二、传统布局三剑客 1. 浮动布局&#xff08;Float Layout&#xff09; 2. 定位布局&#xff08;Position Layou…

Python实战:Excel中文转拼音工具开发教程

在日常办公中&#xff0c;我们经常需要处理Excel文件&#xff0c;有时候需要将中文转换为拼音缩写以方便检索和使用。今天我将分享一个使用Python开发的小工具&#xff0c;它可以自动将Excel文件中指定列的中文转换为拼音缩写。 C:\pythoncode\new\ConvertExcelcontentToPinyin…

图解MySQL【日志】——Redo Log

Redo Log&#xff08;重做日志&#xff09; 为什么需要 Redo Log&#xff1f; 1. 崩溃恢复 数据库崩溃时&#xff0c;系统通过 Redo Log 来恢复尚未写入磁盘的数据。Redo Log 记录了所有已提交事务的操作&#xff0c;系统在重启后会重做这些操作&#xff0c;以保证数据不会丢…

JUC并发—9.并发安全集合四

大纲 1.并发安全的数组列表CopyOnWriteArrayList 2.并发安全的链表队列ConcurrentLinkedQueue 3.并发编程中的阻塞队列概述 4.JUC的各种阻塞队列介绍 5.LinkedBlockingQueue的具体实现原理 6.基于两个队列实现的集群同步机制 4.JUC的各种阻塞队列介绍 (1)基于数组的阻塞…