前言 FlinkCDC是一款基于ChangeDataCapture(CDC)技术的数据同步工具,可以用于将关系型数据库中的数据实时同步到Flink流处理中进行实时计算和分析,下图来自官网的介绍。 下图1是FlinkCDC与其它常见开源CDC方案的对比: 可以看见的是相比于其它开源产品,FlinkCDC不仅支持增量同步,还支持全量全量增量的同步,同时FlinkCDC还支持故障恢复(基于检查点机制实现),能够快速恢复数据同步的进度,并且支持的数据源也很丰富〔2〔(在2。3版本已支持MongoDB、MySQL、OceanBase、Oracle、PostgressSQL、SQLServer、TiDB、Db2等数据源)。 本文将介绍FlinkCDC在数据同步和故障恢复等方面的内容(以MySQL和Oracle为例),同时完整代码也已上传到GitHub。效果展示MySQL Oracle(相比MySQL延迟会稍高) 数据库配置MySQL(5。7) 修改my。cnf配置文件(Windows下是my。ini文件),增加以下配置内容:〔mysqld〕开启binloglogbinmysqlbin选择ROW模式binlogformatROW对于MySQL集群,不同节点的serverid必须不同serverid1过期时间expirelogsdays30 Tips:修改完成后需要重启MySQL服务 建库建表:建库建表createtableflink。user(idbigint(20)notnull,usernamevarchar(20)defaultnull,passwordvarchar(63)defaultnull,statusint(2)defaultnull,createtimedatetimedefaultnull,primarykey(id))ENGINEInnoDBdefaultCHARSETutf8mb4; 创建用户并授权:创建用户flinkCREATEUSERflinkIDENTIFIEDBY授权GRANTSELECT,RELOAD,SHOWDATABASES,REPLICATIONSLAVE,REPLICATIONCLIENTON。TO将flink库的所有权限授权给flink用户GRANTALLPRIVILEGESONflink。TO刷新权限FLUSHPRIVILEGES;Oracle(11g) 以DBA身份连接:SID需要根据实际情况进行设置,比如:XE。exportORACLESIDSIDsqlplusnologCONNECTsysmanagerASSYSDBA 配置日志:altersystemsetdbrecoveryfiledestsize20G;日志文件的地址可以根据自己的情况进行设置 确认是否配置成功: 创建用户并授权:CREATEUSERflinkIDENTIFIEDBYGRANTCREATESESSIONTOGRANTFLASHBACKANYTABLETOGRANTSELECTANYTABLETOGRANTSELECTCATALOGROLETOGRANTEXECUTECATALOGROLETOGRANTSELECTANYTRANSACTIONTOGRANTCREATETABLETO 建表并增加日志记录:建表CREATETABLEflink。user(idNUMBERNOTNULL,usernameVARCHAR2(20),passwordVARCHAR2(63),statusINTEGER,createtimeTIMESTAMP,PRIMARYKEY(id));日志配置ALTERTABLEflink。userADDSUPPLEMENTALLOGDATA(ALL)COLUMNS;代码配置运行环境 依赖 版本 Java 17hrflinkconnector 2。1。0 flink 1。13。0 maven 3。6。2连接配置flinkcdc:datasource:默认类型为MySQLaddr:localhost:3306database:flinkusername:flinkpassword:flinktablelist:user Tips:关于数据源的连接完整配置属性可参考DataSourceProperties。java文件,关于检查点的配置可参考CheckPointProperties。java文件恢复点配置 为了实现故障恢复(应用停止运行过程中数据库有增删改操作的情况)的情况,需要在代码中进行恢复点的相关配置:获取配置的恢复点路径,首次运行不存在会默认进行创建varsaveDircheckPointProperties。getSaveDir();varfoldernewFile(saveDir);if(!folder。exists()!folder。isDirectory()){if(!folder。mkdirs()){thrownewIllegalStateException(文件夹创建失败);}}vardataSourceTypedataSourceProperties。getType()。name()。toLowerCase();vardataSourceSaveDirsaveDirFile。separatordataSourceTvarsavepointDirSavepointUtils。getSavepointRestore(dataSourceSaveDir);varconfigurationnewConfiguration();if(savepointDir!null){设置恢复点路径varsavepointRestoreSettingsSavepointRestoreSettings。forPath(savepointDir);SavepointRestoreSettings。toConfiguration(savepointRestoreSettings,configuration);}启用检查点并设置检查点的保存路径varenvStreamExecutionEnvironment。getExecutionEnvironment(configuration);env。enableCheckpointing(checkPointProperties。getInterval(),CheckpointingMode。EXACTLYONCE);varcheckpointConfigenv。getCheckpointConfig();checkpointConfig。setCheckpointStorage(checkPointProperties。getStorageType()。getPrefix()dataSourceSaveDir);通用注意点 为了避免数值类型显示是一堆字符串,需要增加以下配置:详见https:github。comververicaflinkcdcconnectorswikiFAQ(ZH)E9809AE794A8faqQ5prop。setProperty(bigint。unsigned。handling。mode,long);prop。setProperty(decimal。handling。mode,double);ORACLE配置注意点 为了避免日志增长过快以及读取日志满的问题,需要增加以下配置:详见https:github。comververicaflinkcdcconnectorswikiFAQ(ZH)oraclecdcfaqQ1prop。setProperty(log。mining。strategy,onlinecatalog);prop。setProperty(log。mining。continuous。mine,true); 对于Oracle11g,连接配置中需要增加:详见https:github。comververicaflinkcdcconnectorswikiFAQ(ZH)oraclecdcfaqQ2prop。setProperty(database。tablename。case。insensitive,false);项目运行及使用介绍下载代码 由于本人将博客相关的示例代码都集中到了一个仓库,因此如果不想拉取整个仓库,推荐使用GitZipforgithub这个插件,就可以只下载部分的文件(选中指定文件后点击右下角的下载按钮): 使用介绍 对于需要监控的表,只需要创建相应的实体类,并新建一个类继承AbstractMessageListener(可重写其中的create、delete、update、read等方法处理相应的事件)即可,其中FlickCdcMessageListener注解内的参数填相应的表名即可监听相应的表变更事件(同时需要在yaml文件中tableList中增加要监听的表,如果是Oracle数据库还需要增加日志配置):importcn。butterfly。flinkcdc。annotation。FlickCdcMessageLimportcn。butterfly。flinkcdc。pojo。Uimportlombok。extern。slf4j。Slf4j;importorg。springframework。stereotype。C用户表消息监听器authorzjwdate20230314Slf4jComponentFlickCdcMessageListener(user)publicclassUserMessageListenerextendsAbstractMessageListenerUser{Overridepublicvoidcreate(Useruser){log。info(新增用户:{},user);}}其它注意点FlinkCDC默认的同步策略是第一次运行先进行全量同步,后续即可进行增量读取,因此表数据量比较大的时候,重写AbstractMessageListenerread方法时需要特别注意处理大量数据的情况。由于FlinkCDC是根据数据库的事务日志来获取数据更改的,如果恢复点之后发生了数据更改,那么在恢复点之后的数据将被重复读取,因此需要考虑重复读取的情况。总结 本文简单介绍了FlinkCDC的数据同步和故障恢复方面的内容,对相关基础知识进行了省略