乐知付加密服务平台

如果你有资源, 平台可以帮你实现内容变现, 无需搭建知识付费服务平台。

点击访问官方网站 https://lezhifu.cc

扫码关注公众号 乐知付加密服务平台-微信公众号
Flink实时数仓第一篇数据接入 | chenzuoli's blog

Flink实时数仓第一篇数据接入

最近在做实时数仓相关工作,了解到一些皮毛,但是大致方向还是对的,目前有一些眉目和进展,就跟大家讲讲。今天讲讲实时数据接入吧,怎么将数据实时接入到数据湖或者数据仓库。
flink_logo

来看看流程图:
flink数据接入

  1. 为什么选择Hbase中转,而不是直接入Hive?
    因为Hive不支持更新操作,Hbase可以,当然ES也可以作为中转组件

  2. oracle接入为什么这么复杂?
    因为flink对oracle cdc的支持还在测试中,flink cdc其实封装了debezium和kafka,对于debezium接入oracle数据,还在等debezium出稳定版的oracle接入。我们需要把kafka的数据转换成flink识别的change log,然后才能使用Hbase connector,在hbase中做更新删除操作。

  3. 不支持的cdc数据源怎么办?
    自定义source,只要数据进入到了kafka,一切都好说。

实时数据接入,目标就是将业务库(mysql oracle mongo postgresql等)数据库中的数据拉取到数据仓库中,供实时计算使用,那么如何接入呢,flink提供了许多方式,且看:

  • cdc:sql方式,但是目前只支持mysql和pg,其他数据库正在开发测试中;
  • Api:通过写代码方式接入数据,可以接入许多数据库、消息组件的数据,例如jdbc方式连接的数据库,kafka,rabbitmq等;
  • 自定义source:实现接口SourceFunction

cdc方式全程sql,即可接入到StreamTable中。
一般大公司,为了降低开发成本,提高效率,都希望使用sql方式完成工作,但是flink sql支持不太完善,许多功能需要自己去实现,下面来看看其他数据源如何接入:

  • 比如我们的数据源是从oracle来的,flink暂不支持oracle方式的cdc,那么可以通过oracle ogg将update delete insert类型的change log,写入kafka,然后将kafka中的message,通过udf、udtf方式转化成flink识别的changelog,格式为{“data”: {}, “op”: “”},op有4中类型:+I,-U,+U,-D
  • kafka:kafka connector
  • changelog:kafka changelog connector
  • hbase:hbase connector,目前hbase只支持1.4和2.2版本
  • hive:hive connector
  • es:es connector,不支持source,只支持sink

相关参考链接:
flink_table_connector

如有什么问题,欢迎联系我:
微信:PAIN_7771
qq: 2536888617
邮箱:chenzuoli709@163.com


永远保持思考,居安思危。

欢迎关注微信公众号:【程序员写书】
程序员写书

喜欢宠物的朋友可以关注:【电巴克宠物Pets】
电巴克宠物

一起学习,一起进步。

-------------本文结束感谢您的阅读-------------