乐知付加密服务平台

如果你有资源, 平台可以帮你实现内容变现, 无需搭建知识付费服务平台。

点击访问官方网站 https://lezhifu.cc

扫码关注公众号 乐知付加密服务平台-微信公众号
自定义实现Flink-ClickHouse-Connector | chenzuoli's blog

自定义实现Flink-ClickHouse-Connector

今天分享一个最近自己开发的flink-clickhouse-connector。

大家好,我是chenzuoli,一名大数据工程师,工作之余,会找一些自己感兴趣的东西学习,再写一些文章记录一下,就这点爱好。

今天分享的是flink clickhouse connector,目前写了一个clickhouse source connector,有时间跟大家讲解一下如何写的,自定义connector有哪些步骤。

后面再补充一个sink connector。

实现DynamicTableSourceFactory

指定一下connector对应需要的必需参数和可选参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

// define all the options statically.
public static final ConfigOption<String> URL = ConfigOptions.key("url")
.stringType()
.defaultValue("jdbc:clickhouse://localhost:8123/default");

public static final ConfigOption<String> TABLENAME = ConfigOptions.key("table-name")
.stringType()
.noDefaultValue();

// username and password is optional.
public static final ConfigOption<String> USERNAME = ConfigOptions.key("username")
.stringType()
.noDefaultValue();

public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password")
.stringType()
.noDefaultValue();

最主要的是重写这个方法,createDynamicTableSource这个方法要验证参数,并获取表结构等。

实现ScanTableSource

这里主要看你是全表scan还是实现lookup查找,具体根据你指定的参数来查询数据源,主要方法是getScanRuntimeProvider这个。

继承RichSourceFunction

继承这个类的目的就是具体连接clickhouse和查询clickhouse的逻辑实现了,你需要创建clickhouse连接,并根据指定的参数,查询clickhouse,将结果返回。

我自己通过原生的连接clickhouse的方式,实现了一个工具类ClicKHouseUtil,可以参考一下。

配置SPI

resources目录下创建META-INF/services/org.apache.flink.table.factories.Factory文件,文件内容为你实现的TableFactory类名称(包名及名称)
com.czl.flink.connector.factory.ClickHouseDynamicTableFactory

具体参考github源码地址:
https://github.com/chenzuoli/flink-learning/tree/master/flink-mysql-clickhouse/src/main/java/com/czl/flink/connector


Keep reading, Keep writing, Keep coding.

欢迎关注我的微信公众号,比较喜欢分享知识,也喜欢宠物,所以做了这2个公众号:
程序员写书

欢迎交流,这是我的微信:
wechat

一起学习,一起进步。

-------------本文结束感谢您的阅读-------------