本篇文章结合 https://spark.apache.org/docs/latest/rdd-programming-guide.html#overview

Paper

每个Spark应用的driver program都是由用户侧的main函数驱动的, Spark提供了一种抽象的操作集合: RDD —— 弹性分布式数据集。

RDD的特点:

  • 不可变对象,数据集的逻辑表达;
  • 分布存储 + 并行计算,每个数据分片称为一个分区;
  • 容错可用,失败可重试
  • 可缓存到内存或磁盘
  • 可变换成其它类型的RDD

注意,Spark不支持嵌套型RDD(RDD复合RDD),见Spark-5063.

Spark RDD属性, RDD是一个抽象类, 类声明为


abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging {

RDD[T: ClassTag] 是一个泛型类, RDD中元素的类型。
SparkContext 是不可或缺的参数, 定义了几个

  • compute 具体计算逻辑,由实现的子类完成。
  • getPartitions 返回这个RDD的所有分区Array(返回值是 Array[Partition])
  • getDependencies, RDD对父RDD的依赖关系
  • getPreferredLocations , specify 放置策略
  • partitioner : 可选的的覆写属性。
  • persisit, cache 持久化

RDD与基本操作

创建

RDD有三种创建方式,本地内存,外部流或由别的RDD转换而来。

  • 从内存创建

    • makeRDD
    • parallelize
  • 从外部流创建,

    • textFile 单个文件,或正则匹配
    • wholeTextFiles 读取多个小文本文件的目录

    RDD操作

RDD支持两种类型操作, transform 和 action 。
transform 是将已有的RDD转换为新的RDD, action是对rdd运行计算,将值要返回给Driver。
Spark中所有转换都是惰性的。

    val lines: RDD[String] = context.textFile("input/hello.txt")
    val lineLengths: RDD[Int] = lines.map((line: String) => line.length)
    val totalLength: Int = lineLengths.reduce((a, b) => a + b)


RDD 的依赖血缘, 示例代码为:

lines = spark.textFile("hdfs://...")
errors = lines.filter(_.startsWith("ERROR")
errors.persist()
RDD 持久化

RDD有两个持久化(缓存)方法: cache, persist 。
cache 调用的persist 默认参数的方法(默认缓存到内存中);如果保存到磁盘,要修改StorageLevel。

  /**
   * Persist this RDD with the default storage level (`MEMORY_ONLY`).
   */
  def cache(): this.type = persist()

  def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

重点关注源码中的StorageLevel 方法.
持久化也是“懒操作”, 要等到触发action算子时才执行)

RDD 和分布式共享内存(DSM)的比较

Driver 启动多个workers, worker 从HDFS读取inputdata, 分别在内存计算rdd.

Spark 会将可以流水线执行的窄依赖Transformation放在一个job stage,而Job Stage间要对数据进行Shuffle.
这是Spark DAGScheduler 在生成任务时的作业划分过程完成的.

调度Task的考虑, Spark会关心Partition所在的集群位置,也有就近取数的策略.

什么操作会启动Shuffle

为什么要Shuffle?

In Spark, data is generally not distributed across partitions to be in the necessary place for a specific operation. Duration computations, a single task will operate on a single partition - thus, to organize all the data for a single reduceByKey task to execute, Spark need to perform all-to-all operation. It must read from all partitions to find all the valeus for all keys, and then bring together values across partitions to compute the final result for each key -- this is called the shuffle*

这里我的理解是这样的 —— 简而言之,某些运算要把相同的Key聚合到一起才能计算出来,例如说按Key求和,少一个元素,最终结果都是不准确的。 但数据的输入来源是分散在各个partitions之间的,那么按需要,把满足聚合条件的key聚合在一起的数据移动和分配的过程就是Shuffle。

有哪些Shuffle算子?


Operations which can cause a shuffle include repartition operations like repartition and coalesce, 'ByKey' operations (except for counting) like groupByKey and reduceByKey, and join operations like cogroup and join. 

重分区算子,

  • repartition
  • coalesce
    ByKey操作类,
  • groupByKey
  • reduceByKey
  • sortByKey
  • combineByKey
  • foldByKey
  • aggregateByKey

join算子,

  • jopin
  • cogroup

还有一个distinct

countByKey 是Action , 不是 Transform

文档介绍,该方法只能用于结果的map很小的场景,返回值是Map[K, Long] 不是RDD 。
会加载到driver 的 memory.

/**
   * Count the number of elements for each key, collecting the results to a local Map.
   *
   * @note This method should only be used if the resulting map is expected to be small, as
   * the whole thing is loaded into the driver's memory.
   * To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which
   * returns an RDD[T, Long] instead of a map.
   */
  def countByKey(): Map[K, Long] = self.withScope {
    self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
  }

distinct 也是一个shuffle算子。

def removeDuplicatesInPartition(partition: Iterator[T]): Iterator[T] = {
      // Create an instance of external append only map which ignores values.
      val map = new ExternalAppendOnlyMap[T, Null, Null](
        createCombiner = _ => null,
        mergeValue = (a, b) => a,
        mergeCombiners = (a, b) => a)
      map.insertAll(partition.map(_ -> null))
      map.iterator.map(_._1)
    }


  /**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    --
    partitioner match {
      case Some(_) if numPartitions == partitions.length =>
        mapPartitions(removeDuplicatesInPartition, preservesPartitioning = true)
      case _ => map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
    }
  }


几种ByKey算子

reduceByKey(_+_)     
aggregateByKey(0)(_+_)(_+_)
foldByKey(0)(_+_)
combineByKey( v=>,  (x: Int, y) => x+y,  (x:Int, y:Int) => x+y)

源码: PairRDDFunctions.scala

  def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)

后台调用的都是这个函数, 只不过参数不一样。

combineByKey
第一个数据做什么操作 createCombiner, 相同Key 第一条数据进行的处理函数。
第二个参数,控制分区的聚合逻辑
第三个参数,控制分区间的处理逻辑 mergeCombiner



基函数

def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
    require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
    if (keyClass.isArray) {
      if (mapSideCombine) {
        throw new SparkException("Cannot use map-side combining with array keys.")
      }
      if (partitioner.isInstanceOf[HashPartitioner]) {
        throw new SparkException("HashPartitioner cannot partition array keys.")
      }
    }
    val aggregator = new Aggregator[K, V, C](
      self.context.clean(createCombiner),
      self.context.clean(mergeValue),
      self.context.clean(mergeCombiners))
    if (self.partitioner == Some(partitioner)) {
      self.mapPartitions(iter => {
        val context = TaskContext.get()
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    } else {
      new ShuffledRDD[K, V, C](self, partitioner)
        .setSerializer(serializer)
        .setAggregator(aggregator)
        .setMapSideCombine(mapSideCombine)
    }
  }



foldByKey的重载


  /**
   * Merge the values for each key using an associative function and a neutral "zero value" which
   * may be added to the result an arbitrary number of times, and must not change the result
   * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
   */
  def foldByKey(
      zeroValue: V,
      partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    // Serialize the zero value to a byte array so that we can get a new clone of it on each key
    val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
    val zeroArray = new Array[Byte](zeroBuffer.limit)
    zeroBuffer.get(zeroArray)

    // When deserializing, use a lazy val to create just one instance of the serializer per task
    lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
    val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))

    val cleanedFunc = self.context.clean(func)
    combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v),
      cleanedFunc, cleanedFunc, partitioner)
  }