Spark的广播变量和累加器使用方法代码示例

发布时间 - 2026-01-11 03:28:39    点击率:

一、广播变量和累加器

通常情况下,当向Spark操作(如map,reduce)传递一个函数时,它会在一个远程集群节点上执行,它会使用函数中所有变量的副本。这些变量被复制到所有的机器上,远程机器上并没有被更新的变量会向驱动程序回传。在任务之间使用通用的,支持读写的共享变量是低效的。尽管如此,Spark提供了两种有限类型的共享变量,广播变量和累加器。

1.1 广播变量:

广播变量允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可被用于有效地给每个节点一个大输入数据集的副本。Spark还尝试使用高效地广播算法来分发变量,进而减少通信的开销。

Spark的动作通过一系列的步骤执行,这些步骤由分布式的shuffle操作分开。Spark自动地广播每个步骤每个任务需要的通用数据。这些广播数据被序列化地缓存,在运行任务之前被反序列化出来。这意味着当我们需要在多个阶段的任务之间使用相同的数据,或者以反序列化形式缓存数据是十分重要的时候,显式地创建广播变量才有用。

通过在一个变量v上调用SparkContext.broadcast(v)可以创建广播变量。广播变量是围绕着v的封装,可以通过value方法访问这个变量。举例如下:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

在创建了广播变量之后,在集群上的所有函数中应该使用它来替代使用v.这样v就不会不止一次地在节点之间传输了。另外,为了确保所有的节点获得相同的变量,对象v在被广播之后就不应该再修改。

1.2 累加器:

累加器是仅仅被相关操作累加的变量,因此可以在并行中被有效地支持。它可以被用来实现计数器和总和。Spark原生地只支持数字类型的累加器,编程者可以添加新类型的支持。如果创建累加器时指定了名字,可以在Spark的UI界面看到。这有利于理解每个执行阶段的进程。(对于python还不支持)

累加器通过对一个初始化了的变量v调用SparkContext.accumulator(v)来创建。在集群上运行的任务可以通过add或者”+=”方法在累加器上进行累加操作。但是,它们不能读取它的值。只有驱动程序能够读取它的值,通过累加器的value方法。

下面的代码展示了如何把一个数组中的所有元素累加到累加器上:

scala> val accum = sc.accumulator(0, "My Accumulator")
accum: spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Int = 10

尽管上面的例子使用了内置支持的累加器类型Int,但是开发人员也可以通过继承AccumulatorParam类来创建它们自己的累加器类型。AccumulatorParam接口有两个方法:
zero方法为你的类型提供一个0值。
addInPlace方法将两个值相加。
假设我们有一个代表数学vector的Vector类。我们可以向下面这样实现:

object VectorAccumulatorParam extends AccumulatorParam[Vector] {
 def zero(initialValue: Vector): Vector = {
  Vector.zeros(initialValue.size)
 }
 def addInPlace(v1: Vector, v2: Vector): Vector = {
  v1 += v2
 }
}
// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)

在Scala里,Spark提供更通用的累加接口来累加数据,尽管结果的类型和累加的数据类型可能不一致(例如,通过收集在一起的元素来创建一个列表)。同时,SparkContext..accumulableCollection方法来累加通用的Scala的集合类型。

累加器仅仅在动作操作内部被更新,Spark保证每个任务在累加器上的更新操作只被执行一次,也就是说,重启任务也不会更新。在转换操作中,用户必须意识到每个任务对累加器的更新操作可能被不只一次执行,如果重新执行了任务和作业的阶段。

累加器并没有改变Spark的惰性求值模型。如果它们被RDD上的操作更新,它们的值只有当RDD因为动作操作被计算时才被更新。因此,当执行一个惰性的转换操作,比如map时,不能保证对累加器值的更新被实际执行了。下面的代码片段演示了此特性:

val accum = sc.accumulator(0)
data.map { x => accum += x; f(x) }
//在这里,accum的值仍然是0,因为没有动作操作引起map被实际的计算.

二.Java和Scala版本的实战演示

2.1 Java版本:

/**
 * 实例:利用广播进行黑名单过滤!
 * 检查新的数据 根据是否在广播变量-黑名单内,从而实现过滤数据。
 */
public class BroadcastAccumulator {
 /**
  * 创建一个List的广播变量
  *
  */
 private static volatile Broadcast<List<String>> broadcastList = null;
 /**
  * 计数器!
  */
 private static volatile Accumulator<Integer> accumulator = null;
 public static void main(String[] args) {
  SparkConf conf = new SparkConf().setMaster("local[2]").
    setAppName("WordCountOnlineBroadcast");
  JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));

  /**
   * 注意:分发广播需要一个action操作触发。
   * 注意:广播的是Arrays的asList 而非对象的引用。广播Array数组的对象引用会出错。
   * 使用broadcast广播黑名单到每个Executor中!
   */
  broadcastList = jsc.sc().broadcast(Arrays.asList("Hadoop","Mahout","Hive"));
  /**
   * 累加器作为全局计数器!用于统计在线过滤了多少个黑名单!
   * 在这里实例化。
   */
  accumulator = jsc.sparkContext().accumulator(0,"OnlineBlackListCounter");

  JavaReceiverInputDStream<String> lines = jsc.socketTextStream("Master", 9999);

  /**
   * 这里省去flatmap因为名单是一个个的!
   */
  JavaPairDStream<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
   @Override
   public Tuple2<String, Integer> call(String word) {
    return new Tuple2<String, Integer>(word, 1);
   }
  });
  JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
   @Override
   public Integer call(Integer v1, Integer v2) {
    return v1 + v2;
   }
  });
  /**
   * Funtion里面 前几个参数是 入参。
   * 后面的出参。
   * 体现在call方法里面!
   *
   */
  wordsCount.foreach(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
   @Override
   public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws Exception {
    rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
     @Override
     public Boolean call(Tuple2<String, Integer> wordPair) throws Exception {
      if (broadcastList.value().contains(wordPair._1)) {
       /**
        * accumulator不仅仅用来计数。
        * 可以同时写进数据库或者缓存中。
        */
       accumulator.add(wordPair._2);
       return false;
      }else {
       return true;
      }
     };
     /**
      * 广播和计数器的执行,需要进行一个action操作!
      */
    }).collect();
    System.out.println("广播器里面的值"+broadcastList.value());
    System.out.println("计时器里面的值"+accumulator.value());
    return null;
   }
  });

  jsc.start();
  jsc.awaitTermination();
  jsc.close();
 }
 }

2.2 Scala版本

package com.Streaming
import java.util
import org.apache.spark.streaming.{Duration, StreamingContext}
import org.apache.spark.{Accumulable, Accumulator, SparkContext, SparkConf}
import org.apache.spark.broadcast.Broadcast
/**
 * Created by lxh on 2016/6/30.
 */
object BroadcastAccumulatorStreaming {
 /**
 * 声明一个广播和累加器!
 */
 private var broadcastList:Broadcast[List[String]] = _
 private var accumulator:Accumulator[Int] = _
 def main(args: Array[String]) {
 val sparkConf = new SparkConf().setMaster("local[4]").setAppName("broadcasttest")
 val sc = new SparkContext(sparkConf)
 /**
  * duration是ms
  */
 val ssc = new StreamingContext(sc,Duration(2000))
 // broadcastList = ssc.sparkContext.broadcast(util.Arrays.asList("Hadoop","Spark"))
 broadcastList = ssc.sparkContext.broadcast(List("Hadoop","Spark"))
 accumulator= ssc.sparkContext.accumulator(0,"broadcasttest")
 /**
  * 获取数据!
  */
 val lines = ssc.socketTextStream("localhost",9999)
 /**
  * 1.flatmap把行分割成词。
  * 2.map把词变成tuple(word,1)
  * 3.reducebykey累加value
  * (4.sortBykey排名)
  * 4.进行过滤。 value是否在累加器中。
  * 5.打印显示。
  */
 val words = lines.flatMap(line => line.split(" "))
 val wordpair = words.map(word => (word,1))
 wordpair.filter(record => {broadcastList.value.contains(record._1)})
 val pair = wordpair.reduceByKey(_+_)
 /**
  * 这个pair 是PairDStream<String, Integer>
  * 查看这个id是否在黑名单中,如果是的话,累加器就+1
  */
/* pair.foreachRDD(rdd => {
  rdd.filter(record => {
  if (broadcastList.value.contains(record._1)) {
   accumulator.add(1)
   return true
  } else {
   return false
  }
  })
 })*/
 val filtedpair = pair.filter(record => {
  if (broadcastList.value.contains(record._1)) {
   accumulator.add(record._2)
   true
  } else {
   false
  }
  }).print
 println("累加器的值"+accumulator.value)
 // pair.filter(record => {broadcastList.value.contains(record._1)})
 /* val keypair = pair.map(pair => (pair._2,pair._1))*/
 /**
  * 如果DStream自己没有某个算子操作。就通过转化transform!
  */
 /* keypair.transform(rdd => {
  rdd.sortByKey(false)//TODO
 })*/
 pair.print()
 ssc.start()
 ssc.awaitTermination()
 }
}

总结

以上就是本文关于Spark的广播变量和累加器使用方法代码示例的全部内容,希望对大家有所帮助。感兴趣的朋友可以参阅:详解Java编写并运行spark应用程序的方法  、 Spark入门简介等,有什么问题可以随时留言,小编会及时回复大家。感谢朋友们对网站的支持。


# spark  # 累加器的使用  # 广播变量  # 使用  # Spark学习笔记之Spark中的RDD的具体使用  # Spark学习笔记之Spark SQL的具体使用  # Spark学习笔记Spark Streaming的使用  # 初识Spark入门  # 详解Java编写并运行spark应用程序的方法  # Python中用Spark模块的使用教程  # pyspark 读取csv文件创建DataFrame的两种方法  # 使用docker快速搭建Spark集群的方法教程  # Spark学习笔记(一)Spark初识【特性、组成、应用】  # 累加器  # 可以通过  # 在这里  # 器上  # 有效地  # 序列化  # 它会  # 创建一个  # 自己的  # 机器上  # 的是  # 是一个  # 有什么  # 几个  # 多个  # 就不  # 两种  # 还不  # 计时器  # 感兴趣 


相关栏目: 【 网站优化151355 】 【 网络推广146373 】 【 网络技术251813 】 【 AI营销90571


相关推荐: 作用域操作符会触发自动加载吗_php类自动加载机制与::调用【教程】  图册素材网站设计制作软件,图册的导出方式有几种?  Laravel Eloquent模型如何创建_Laravel ORM基础之Model创建与使用教程  佐糖AI抠图怎样调整抠图精度_佐糖AI精度调整与放大细化操作【攻略】  如何在腾讯云服务器上快速搭建个人网站?  Laravel如何使用Sanctum进行API认证?(SPA实战)  Laravel怎么集成Log日志记录_Laravel单文件与每日日志配置及自定义通道【详解】  Laravel怎么配置.env环境变量_Laravel生产环境敏感数据保护与读取【方法】  如何在IIS中新建站点并配置端口与物理路径?  长沙企业网站制作哪家好,长沙水业集团官方网站?  Laravel定时任务怎么设置_Laravel Crontab调度器配置  Laravel与Inertia.js怎么结合_使用Laravel和Inertia构建现代单页应用  Laravel的辅助函数有哪些_Laravel常用Helpers函数提高开发效率  Java遍历集合的三种方式  如何制作新型网站程序文件,新型止水鱼鳞网要拆除吗?  JavaScript如何操作视频_媒体API怎么控制播放  Google浏览器为什么这么卡 Google浏览器提速优化设置步骤【方法】  html5源代码发行怎么设置权限_访问权限控制方法与实践【指南】  Laravel Session怎么存储_Laravel Session驱动配置详解  手机网站制作与建设方案,手机网站如何建设?  高防服务器租用指南:配置选择与快速部署攻略  Laravel如何生成PDF或Excel文件_Laravel文档导出工具与使用教程  如何快速选择适合个人网站的云服务器配置?  Laravel如何与Pusher实现实时通信?(WebSocket示例)  如何在阿里云购买域名并搭建网站?  Laravel如何发送系统通知_Laravel Notifications实现多渠道消息通知  php485函数参数是什么意思_php485各参数详细说明【介绍】  标题:Vue + Vuex + JWT 身份认证的正确实践与常见误区解析  Laravel怎么实现搜索高亮功能_Laravel结合Scout与Algolia全文检索【实战】  微信小程序 闭包写法详细介绍  javascript中闭包概念与用法深入理解  phpredis提高消息队列的实时性方法(推荐)  如何用VPS主机快速搭建个人网站?  简单实现Android文件上传  猎豹浏览器开发者工具怎么打开 猎豹浏览器F12调试工具使用【前端必备】  微信小程序制作网站有哪些,微信小程序需要做网站吗?  如何正确选择百度移动适配建站域名?  零基础网站服务器架设实战:轻量应用与域名解析配置指南  java获取注册ip实例  Laravel Docker环境搭建教程_Laravel Sail使用指南  android nfc常用标签读取总结  Laravel Blade组件怎么用_Laravel可复用视图组件的创建与使用  HTML5空格和margin有啥区别_空格与外边距的使用场景【说明】  如何为不同团队 ID 动态生成多个独立按钮  laravel服务容器和依赖注入怎么理解_laravel服务容器与依赖注入解析  高端智能建站公司优选:品牌定制与SEO优化一站式服务  浅析上传头像示例及其注意事项  Java类加载基本过程详细介绍  公司门户网站制作公司有哪些,怎样使用wordpress制作一个企业网站?  大连 网站制作,大连天途有线官网?