今天来说下flink sql中常用到的connector:kafka,它承接了实时的消息数据,进行处理,当然,这些消息的特点有可能不一样,怎样处理,得到实时的结果,提供给分析、运营、营销等等。下面来看看具体有什么区别,怎么使用。

kafka中的实时消息,它也可以是关系型数据库的changelog(具有insert/update/delete属性),我们知道每一条消息的属性之后,就可以确定哪些数据是最新的,哪些数据是被删除的,那么在我们的存储系统中,应该被实时地更新,以便计算的及时,数据的延时性就会更低。

他们俩的依赖都是一样的
依赖
1 | <dependency> |
先来说下kafka connector:
1.创建一个kafka connector(sql)
1 | CREATE TABLE KafkaTable ( |
这种connector,kafka中的数据是什么样子,KafkaTable表中的数据就是什么样子的,可以执行select语句查看:
1 | select * from KafkaTable |
接着你就可以对KafkaTable表中的数据进行各种操作了。
再来看下upsertkafka connector
先介绍一下,upsert kafka连接器支持实时消息数据以upsert方式从一个kafka topic中插入到另一个kafka topic中,source为changelog类型kafka, sink为kafka
什么是changelog类型kafka?,就是kafka topic中的消息是带有一个属性的,这个属性标记数据是Insert、update before、update after、delete的,再根据主键来进行对sink kafka topic中的数据进行具体的操作,是插入,还是更新,还是删除。
另外,value为空的消息,将被视为Delete消息。
1.来看看sql的例子
1 | CREATE TABLE pageviews_per_region ( |
记住DDL语句中,一定要设置主键Primary key。
上面的pageviews_per_region表中计算的pv,uv结果就是统计结果,当数据源端进行了增删改,对应的pv uv结果就会同步更新,这就是upsert kafka的魅力。
这是基于kafka的统计计算,前提条件是topic pageviews中的数据是changelog格式的。
changelog格式数据怎么来,怎么导入kafka,还有upsert到mysql、hbase呢,且看下回分解。
每天进步一点点,你会成为专家。
欢迎关注我的微信公众号,比较喜欢分享知识,也喜欢宠物,所以做了这2个公众号:
喜欢宠物的朋友可以关注:【电巴克宠物Pets】
一起学习,一起进步。
