今天分享一个最近自己开发的flink-clickhouse-connector。
大家好,我是chenzuoli,一名大数据工程师,工作之余,会找一些自己感兴趣的东西学习,再写一些文章记录一下,就这点爱好。
今天分享的是flink clickhouse connector,目前写了一个clickhouse source connector,有时间跟大家讲解一下如何写的,自定义connector有哪些步骤。
后面再补充一个sink connector。
实现DynamicTableSourceFactory
指定一下connector对应需要的必需参数和可选参数
1  | 
  | 
最主要的是重写这个方法,createDynamicTableSource这个方法要验证参数,并获取表结构等。
实现ScanTableSource
这里主要看你是全表scan还是实现lookup查找,具体根据你指定的参数来查询数据源,主要方法是getScanRuntimeProvider这个。
继承RichSourceFunction
继承这个类的目的就是具体连接clickhouse和查询clickhouse的逻辑实现了,你需要创建clickhouse连接,并根据指定的参数,查询clickhouse,将结果返回。
我自己通过原生的连接clickhouse的方式,实现了一个工具类ClicKHouseUtil,可以参考一下。
配置SPI
resources目录下创建META-INF/services/org.apache.flink.table.factories.Factory文件,文件内容为你实现的TableFactory类名称(包名及名称)
com.czl.flink.connector.factory.ClickHouseDynamicTableFactory
具体参考github源码地址:
https://github.com/chenzuoli/flink-learning/tree/master/flink-mysql-clickhouse/src/main/java/com/czl/flink/connector
Keep reading, Keep writing, Keep coding.
欢迎关注我的微信公众号,比较喜欢分享知识,也喜欢宠物,所以做了这2个公众号:
欢迎交流,这是我的微信:
一起学习,一起进步。
