Shuffle分成两个阶段来看待, Shuffle Write和Shuffle Read.
前者由Map端处理, 写到磁盘(数据文件+索引文件).
后者由Reduce端处理, 调用ShuffleRDD的compute方法完成read.

ShuffleMapTask

将RDD中的数据划分成若干个buckets (基于ShuffleDependency中的分区器)

private[spark] class ShuffleMapTask(
    stageId: Int,
    stageAttemptId: Int,
    taskBinary: Broadcast[Array[Byte]],
    partition: Partition,
    @transient private var locs: Seq[TaskLocation],
    localProperties: Properties,
    serializedTaskMetrics: Array[Byte],
    jobId: Option[Int] = None,
    appId: Option[String] = None,
    appAttemptId: Option[String] = None,
    isBarrier: Boolean = false)
  extends Task[MapStatus](stageId, stageAttemptId, partition.index, localProperties,
    serializedTaskMetrics, jobId, appId, appAttemptId, isBarrier)

runTask 方法



  override def runTask(context: TaskContext): MapStatus = {
    // Deserialize the RDD using the broadcast variable.
    val threadMXBean = ManagementFactory.getThreadMXBean
    val deserializeStartTimeNs = System.nanoTime()
    val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime
    } else 0L
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val rddAndDep = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
    _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
    } else 0L

    val rdd = rddAndDep._1
    val dep = rddAndDep._2
    // While we use the old shuffle fetch protocol, we use partitionId as mapId in the
    // ShuffleBlockId construction.
    val mapId = if (SparkEnv.get.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) {
      partitionId
    } else context.taskAttemptId()
    dep.shuffleWriterProcessor.write(rdd, dep, mapId, context, partition)
  }


核心语句是这里的 dep.shuffleWriterProcessor.write(rdd, dep, mapId, context, partition)

ShuffleWriteProcessor

这是Shuffle写阶段的处理类, 核心方法是write, 创建了 ShuffleWriter 对象 (通过shuffleManager的getWriter 方法) 完成写操作, try 语句中的内容如下


 val manager = SparkEnv.get.shuffleManager
      writer = manager.getWriter[Any, Any](
        dep.shuffleHandle,
        mapId,
        context,
        createMetricsReporter(context))
      writer.write(
        rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
      val mapStatus = writer.stop(success = true)
      if (mapStatus.isDefined) {
        // Initiate shuffle push process if push based shuffle is enabled
        // The map task only takes care of converting the shuffle data file into multiple
        // block push requests. It delegates pushing the blocks to a different thread-pool -
        // ShuffleBlockPusher.BLOCK_PUSHER_POOL.
        if (dep.shuffleMergeEnabled && dep.getMergerLocs.nonEmpty && !dep.shuffleMergeFinalized) {
          manager.shuffleBlockResolver match {
            case resolver: IndexShuffleBlockResolver =>
              val dataFile = resolver.getDataFile(dep.shuffleId, mapId)
              new ShuffleBlockPusher(SparkEnv.get.conf)
                .initiateBlockPush(dataFile, writer.getPartitionLengths(), dep, partition.index)
            case _ =>
          }
        }
      }
      mapStatus.get


ShuffleManager 早期还有一个HashShuffleManager的实现,目前新版本中只有一个SortShuffleManager.
该类中的getWriter 通过匹配handle的类型,创建不同的ShuffleterWriter:

unsafeShuffleHandle -> UnsafeShuffleWriter;
bypassMergeSortHandle -> BypassMergeSortShuffleWriter
BaseShuffleHandle -> SortShuffleWriter

这三种handle是通过SortShuffleManager.registerShuffle 注册时创建的:

override def registerShuffle[K, V, C](
      shuffleId: Int,
      dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
    if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
      new BypassMergeSortShuffleHandle[K, V](
        shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
      // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
      new SerializedShuffleHandle[K, V](
        shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else {
      // Otherwise, buffer map outputs in a deserialized form:
      new BaseShuffleHandle(shuffleId, dependency)
    }

注释: 先判断能不能忽略归并排序, 代码注释给出了两个条件.

如果分区数量小于 spark.shuffle.sort.bypassMergeThreshold partitions  并且不需要进行 map-side 聚合, 直接写 ** numPartitions ** 个文件, 然后最终将它们连接起来, 这样避免两次序列化和反序列化. 这样人帮的缺点时会同一时间一次性打开多个文件,造成更多的内存缓冲区的使用. 

如果满足这些条件, 就返回一个 Bypass的Handle.

第二层判断, 判断是否使用序列化Shuffle (SerializedShuffle), 通过 canUseSerializedShuffle 方法判断. 要求:

  • 序列化框架要能支持重定位(RelocationOfSerializedObject)
  • 不能使用mapSideCombine
  • 分区数量不能大于 MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE 即 16777216

最后,使用默认的BaseShuffleHandle (没有限制条件)

ShuffleWriter

ShuffleWriter 是抽象类, 有三个继承实现

  • SortShuffleWriter
  • BypassMergeSortShuffleWriter
  • UnsafeSHuffleWriter

三个写对象有什么区别呢?

SortShuffleWriter

// write 方法

  /** Write a bunch of records to this task's output */
  override def write(records: Iterator[Product2[K, V]]): Unit = {
    sorter = if (dep.mapSideCombine) {
      new ExternalSorter[K, V, C](
        context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
    } else {
      // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
      // care whether the keys get sorted in each partition; that will be done on the reduce side
      // if the operation being run is sortByKey.
      new ExternalSorter[K, V, V](
        context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
    }
    sorter.insertAll(records)

 
    val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(
      dep.shuffleId, mapId, dep.partitioner.numPartitions)
    sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter)
    partitionLengths = mapOutputWriter.commitAllPartitions(sorter.getChecksums).getPartitionLengths
    mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
  }

先创建一个排序器 sorter (sorter是 ExternalSorter 对象) , 使用sorter.insertAll (records) 排序.
核心语句是 sorter的writePartitionedMapOutput, 执行写操作.

补充, 构建sorter时也要判断是否用到了map-side 聚合

  • ExternalSorter.writerPartitionedMapOutput

  • LocalDiskShuffleMapOutputWriter.commitAllPartitions

  • IndexShuffleBlockResolver.writeMetadataFileAndCommit

在本地写磁盘时,要写的metadata包括两种信息, 索引文件和checksum文件, 其中checumSum 是可省略的.

sorter.insertAll

map是PartitionedAppendOnlyMap 一个映射表, 如果开启了预聚合,用map来更新对应的key-value信息. 溢写操作 maybeSpillCollection(usingMap = true) 参数为true.
buffer 是 PartitionedPairBuffer 是个纯数组结构, 只做纯append, 溢写操作参数是false.

...
    if (shouldCombine) {
      // Combine values in-memory first using our AppendOnlyMap
      val mergeValue = aggregator.get.mergeValue
      val createCombiner = aggregator.get.createCombiner
      var kv: Product2[K, V] = null
      val update = (hadValue: Boolean, oldValue: C) => {
        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
      }
      while (records.hasNext) {
        addElementsRead()
        kv = records.next()
        map.changeValue((getPartition(kv._1), kv._1), update)
        maybeSpillCollection(usingMap = true)
      }
    }

如果没有支持预聚合操作,

....
 else {
      // Stick values into our buffer
      while (records.hasNext) {
        addElementsRead()
        val kv = records.next()
        buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
        maybeSpillCollection(usingMap = false)
      }
    }

sorter.maybeSpillCollection(usingMap: Boolean)

内存缓冲区溢写到磁盘的操作.
true表示map,

  estimatedSize = map.estimateSize()
      if (maybeSpill(map, estimatedSize)) {
        map = new PartitionedAppendOnlyMap[K, C]

false表示是buffer数组.

sorter.maybeSpill(collection C, currentMemory: Long)

判断是否应该要溢写?
元素数量是32的倍数 并且 当前内存大于指定阈值 (SHUFFLE_SPILL_INITIAL_MEM_THRESHOLD 默认取自于配置文件 key为 spark.shuffle.spill.initialMemoryThreshold) 5MB

尝试申请新内存(amountToRequest, acquireMemory)
如果申请资源不足以满足, 开始强制溢写.

另外一种开启溢写的机制是判断读取元素数量太多, 大于
numElementsForceSpillThreshold (SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD) 即整数的最大值, 会强制溢写

protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
    var shouldSpill = false
    if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
      
      val amountToRequest = 2 * currentMemory - myMemoryThreshold
      val granted = acquireMemory(amountToRequest)
      myMemoryThreshold += granted
      // If we were granted too little memory to grow further (either tryToAcquire returned 0,
      // or we already had more memory than myMemoryThreshold), spill the current collection
      shouldSpill = currentMemory >= myMemoryThreshold
    }
    shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
    // Actually spill
    if (shouldSpill) {
      _spillCount += 1
      logSpillage(currentMemory)
      spill(collection)
      _elementsRead = 0
      _memoryBytesSpilled += currentMemory
      releaseMemory()
    }
    shouldSpill
  }

logSpillage(currentMemory) spill(collection) 执行真正的溢写操作
溢写过后, 调用relaseMemory () 释放内存.

spill(collection) -> spillMemoryIteratorToDisk(inMemoryIterator)

在ExternalSortger中, spill 方法真正调用的是 spillMemoryIteratorToDisk

方法注释, 将内存中的一部分迭代对象,写到磁盘中的一个临时文件.
Spill contents of in-memory iterator to a temporary file on disk.

临时文件创建:
diskBlockManager.createTempShuffleBlock()

溢写的写对象, writer:
val writer: DiskBlockObjectWriter = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics)

fileBufferSize 大小是 SHUFFLE_FILE_BUFFER_SIZE 32K
(spark.shuffle.file.buffer)

sorter.partitionedIterator

最后返回所有的数据,既包括磁盘的溢写临时文件,也包括内存缓冲区中的文件

BypassMergeSortShuffleWriter

ShuffleRead阶段

想象Reduce是一个结果Stage, 那么通过ResultStage就应该可以找到读数据的逻辑 , ResultTask.runTask中有这么一段:
func(context, rdd.iterator(partition, context))
调用RDD的iterator 方法, 继而调用 RDD.getOrCompute -- computeOrReadCheckpoint -- compute 方法

因为要看的是Shuffle过程, 所以去查看ShuffleRDD的compute方法.

可以看到, SparkEnv.get.shuffleManager.getReader 这一行生成了Reader,并执行read方法.