乐知付加密服务平台

如果你有资源, 平台可以帮你实现内容变现, 无需搭建知识付费服务平台。

点击访问官方网站 https://lezhifu.cc

扫码关注公众号 乐知付加密服务平台-微信公众号
Flink实时数仓第二篇 | chenzuoli's blog

Flink实时数仓第二篇

今天说下我在数据接入过程中遇到的一个奇葩的数据一致性的问题,就是在flink删除hbase数据的时候,返回了上一版本的数据,而不是直接删除。

环境

1
2
3
4
5
6
centos7.4
jdk1.8
flink 1.12.1
hbase 1.4.13
hadoop 2.7.4
zookeeper 3.4.10

问题

通过mysql-cdc和hbase-1.4 connector,直接将数据写入hbase,两个sql如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// create mysql-cdc table
CREATE TABLE packageSourceTable (
ID int,
PACKAGE_ID int,
GAME_ID int,
CHANNEL_ID int,
PLATFORM int,
POPULARIZE int,
COMPANY_ID int
) with (
'connector' = 'mysql-cdc',
'hostname' = 'xxx',
'port' = '3306',
'username' = 'xxx',
'password' = 'xxx',
'database-name' = 'game_platform',
'table-name' = 'packages',
'server-time-zone' = 'Asia/Shanghai'
);

// create hbase table
CREATE TABLE hTable (
rowkey INT,
column_family ROW<
ID int,
PACKAGE_ID int,
GAME_ID int,
CHANNEL_ID int,
PLATFORM int,
POPULARIZE int,
COMPANY_ID int>,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'mytable',
'zookeeper.quorum' = 'localhost:2181'
);

// insert into hbase
INSERT INTO hTable
select
id as rowkey,
row(ID, PACKAGE_ID, GAME_ID, CHANNEL_ID, PLATFORM, POPULARIZE, COMPANY_ID);

ok,上述三个sql分别在sql-client中执行,就可以直接把mysql的更新、删除、插入操作实时同步到hbase了,然后如果需要的话,建一个hive外表,通过sql做数据分析用。

在进行数据一致性校验的时候,发现在mysql端,如果某一条数据被更新了好几次,然后再删除,在hbase这边出现的情况是,更新操作正常,删除操作hbase直接回到最后一次更新之前的记录,非常的奇怪。

于是试了hbase shell的更新删除和java api的更新删除操作,均不会出现此类情况。

于是把重心转移到hbase身上,因为hbase存储机制有一些特殊,LSM架构让Hbase存在一些特殊情况,每条增删改记录都是在原数据的基础上进行拼接了新的记录,并根据主键做好标记(Insert Delete_Before Update_Before Update_After),每个Region包含多个Store,Store中包含一个MemStore和多个StoreFile,一个列簇对应一个Store,数据的增删改先存储在MemStore中,MemStore达到一定的数据量后,会flush到StoreFile,当StoreFile达到一定数据量和大小后,会进行compact,此时才是真正的合并数据,将delete的和更新前的,版本失效、过期的数据移除掉。

当时是这样分析的,flink删除hbase数据时,只删除了memstore中的最高版本的数据,低版本的数据仍然还在内存中,于是建hbase表时指定版本为1,只有一个版本,但是仍然出现以上问题。
于是继续猜测,可能是因为memstore没有flush和compact,导致内存中一直存在更新和删除的历史记录,于是在做完多次更新之后,进行一次手动flush和compact操作,果然flink可以删除成功了。

那怎么办呢?就去研究flink hbase connector如何进行flush compact操作了,发现了三个参数,跟flush有关,如下:

parameterrequireddefaulttypedescription
sink.buffer-flush.max-sizeoptional2mbMemorySizeWriting option, maximum size in memory of buffered rows for each writing request. This can improve performance for writing data to HBase database, but may increase the latency. Can be set to ‘0’ to disable it.
sink.buffer-flush.max-rowsoptional1000IntegerWriting option, maximum number of rows to buffer for each writing request. This can improve performance for writing data to HBase database, but may increase the latency. Can be set to ‘0’ to disable it.
sink.buffer-flush.intervaloptional1sDurationWriting option, the interval to flush any buffered rows. This can improve performance for writing data to HBase database, but may increase the latency. Can be set to ‘0’ to disable it. Note, both ‘sink.buffer-flush.max-size’ and ‘sink.buffer-flush.max-rows’ can be set to ‘0’ with the flush interval set allowing for complete async processing of buffered actions.

于是加上了这几个参数,设置的是每来一条数据就刷一条,但是不管用,哭。。。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// create hbase table
CREATE TABLE hTable (
rowkey INT,
column_family ROW<
ID int,
PACKAGE_ID int,
GAME_ID int,
CHANNEL_ID int,
PLATFORM int,
POPULARIZE int,
COMPANY_ID int>,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'mytable',
'zookeeper.quorum' = 'localhost:2181',
'sink.buffer-flush.max-rows' = '1',
'sink.buffer-flush.interval' = '1s'
);

最后实在没办法,想到既然自己写的java api操作hbase增删改数据一致性没问题,那么是不是看看flink hbase connector如何操作的,改掉它不就行了,connector也是java写的。
于是走到了修改源码的道路,按照自己写的java api操作hbase 的方法修改了源码,重新打包,但是依然不见效果,绝了,无语了,放弃了。

解决方案

上面的问题,一度困扰了我三天时间,都怀疑这条路是否能行得通了,最后终于放弃了,转向研究其他实时数仓路线,了解到Flink+iceberg能做到近实时,于是开始尝试了,iceberg目前还在发展中,没有稳定版本,我现在看的iceberg适配flink1.11.x,前面我用的是flink1.12.1,所以需要降低版本。
iceberg_flink
于是下载了flink1.11.3,停掉了原来的flink,在同一台机器上,修改了FLINK_HOME,启动了1.11.3的flink,按照iceberg官档操作起来了。
晚上回去还是有些对Hbase路线不甘心,想去一探究竟到底是啥原因引起的,继续看源码,继续尝试,上次修改了源码,不知道有没有生效,这次打上日志,运行看看是不是真的走这里,进行的删除操作。但是还原回flink1.12.1版本出问题了。之前使用的1.11.3版本,导致zookeeper和hdfs存储的是低版本数据,无法升级了,想着删除zookeeper的元数据,但是进去后发现flink相关的数据,啥也没有,怎么会这么奇怪呢?

迫不得已,新建了一个zookeeper,才让flink1.12.1启动起来了,然后修改了connector的源码打上日志信息,但是日志也没打印出来,啊啊啊啊啊啊啊啊啊啊,到底是走的哪里呢,但是奇迹出现了,之前的问题突然消失了,flink能正常删除hbase数据了,而且没有版本问题,神奇。
最后分析了一下,hbase的使用zookeeper,主要是来分配region和regionserver监控的,flink使用zookeeper,主要记录任务的运行状况和checkpoint信息,当checkpoint信息在zookeeper中存储出现问题的时候,删除自然会出问题了。
所以最后的解决方案是,为flink重新配了一个zookeeper。

ok,完成。


书山有路勤为径,学海无涯苦作舟。

欢迎关注我的微信公众号,比较喜欢分享知识,也喜欢宠物,所以做了这2个公众号:
程序员写书
程序员写书
电巴克宠物
电巴克宠物
一起学习,一起进步。

-------------本文结束感谢您的阅读-------------