乐知付加密服务平台

如果你有资源, 平台可以帮你实现内容变现, 无需搭建知识付费服务平台。

点击访问官方网站 https://lezhifu.cc

扫码关注公众号 乐知付加密服务平台-微信公众号
Flink-connector中kafka和upsertkafka的介绍 | chenzuoli's blog

Flink-connector中kafka和upsertkafka的介绍

今天来说下flink sql中常用到的connector:kafka,它承接了实时的消息数据,进行处理,当然,这些消息的特点有可能不一样,怎样处理,得到实时的结果,提供给分析、运营、营销等等。下面来看看具体有什么区别,怎么使用。

kafka

kafka中的实时消息,它也可以是关系型数据库的changelog(具有insert/update/delete属性),我们知道每一条消息的属性之后,就可以确定哪些数据是最新的,哪些数据是被删除的,那么在我们的存储系统中,应该被实时地更新,以便计算的及时,数据的延时性就会更低。

flink_logo

他们俩的依赖都是一样的
依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.12.3</version>
</dependency>

先来说下kafka connector:

1.创建一个kafka connector(sql)

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset', // 消费的offset位置,也可以是latest-offset
'format' = 'csv'
)

这种connector,kafka中的数据是什么样子,KafkaTable表中的数据就是什么样子的,可以执行select语句查看:

1
select * from KafkaTable

接着你就可以对KafkaTable表中的数据进行各种操作了。

再来看下upsertkafka connector

先介绍一下,upsert kafka连接器支持实时消息数据以upsert方式从一个kafka topic中插入到另一个kafka topic中,source为changelog类型kafka, sink为kafka

什么是changelog类型kafka?,就是kafka topic中的消息是带有一个属性的,这个属性标记数据是Insert、update before、update after、delete的,再根据主键来进行对sink kafka topic中的数据进行具体的操作,是插入,还是更新,还是删除。

另外,value为空的消息,将被视为Delete消息。

1.来看看sql的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
CREATE TABLE pageviews_per_region (
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'pageviews_per_region',
'properties.bootstrap.servers' = '...',
'key.format' = 'avro',
'value.format' = 'avro'
);

CREATE TABLE pageviews (
user_id BIGINT,
page_id BIGINT,
viewtime TIMESTAMP,
user_region STRING,
WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'pageviews',
'properties.bootstrap.servers' = '...',
'format' = 'json'
);

-- 计算 pv、uv 并插入到 upsert-kafka sink
INSERT INTO pageviews_per_region
SELECT
user_region,
COUNT(*),
COUNT(DISTINCT user_id)
FROM pageviews
GROUP BY user_region;

记住DDL语句中,一定要设置主键Primary key。

上面的pageviews_per_region表中计算的pv,uv结果就是统计结果,当数据源端进行了增删改,对应的pv uv结果就会同步更新,这就是upsert kafka的魅力。

这是基于kafka的统计计算,前提条件是topic pageviews中的数据是changelog格式的。

changelog格式数据怎么来,怎么导入kafka,还有upsert到mysql、hbase呢,且看下回分解。


每天进步一点点,你会成为专家。

欢迎关注我的微信公众号,比较喜欢分享知识,也喜欢宠物,所以做了这2个公众号:
程序员写书

喜欢宠物的朋友可以关注:【电巴克宠物Pets】
电巴克宠物

一起学习,一起进步。

-------------本文结束感谢您的阅读-------------