乐知付加密服务平台

如果你有资源, 平台可以帮你实现内容变现, 无需搭建知识付费服务平台。

点击访问官方网站 https://lezhifu.cc

扫码关注公众号 乐知付加密服务平台-微信公众号
Spark累加器的使用方法 | chenzuoli's blog

Spark累加器的使用方法

运行spark程序,使用到了累加器Accumulator,目前使用的是spark2.3.0,累加器Accumulator的定义方法变了,具体查看详细内容。

之前spark1.6.0时,累加器的定义及使用方式为:

1
2
3
4
5
6
Accumulator<Integer> accum = sc.accumulator(0);
sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
accum.value();
// returns 10

在spark2.3.0中,累加器的定义方式应该为:

1
2
3
4
5
6
LongAccumulator accum = jsc.sc().longAccumulator();
sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
accum.value();
// returns 10

之前的方式已被标记为Deprecated。
也可以如此,先定义,在注册到SparkConf:

1
2
3
4
LongAccumulator countDftResult = new LongAccumulator();
LongAccumulator countFailed = new LongAccumulator();
sc.register(countDftResult); // 注册累加器
sc.register(countFailed);

如果不注册,会出现Accumulator must be registered before send to executor异常。
到这里就基本可以使用累加器了,谢谢大家,如果有什么问题,请留言。


书山有路勤为径,学海无涯苦作舟。

欢迎关注微信公众号:【程序员写书】
程序员写书

喜欢宠物的朋友可以关注:【电巴克宠物Pets】
电巴克宠物

一起学习,一起进步。

-------------本文结束感谢您的阅读-------------