Spark源码学习(三)-Shuffle
Shuffle分成两个阶段来看待, Shuffle Write和Shuffle Read.
前者由Map端处理, 写到磁盘(数据文件+索引文件).
后者由Reduce端处理, 调用ShuffleRDD的compute方法完成read.
ShuffleMapTask
将RDD中的数据划分成若干个buckets (基于ShuffleDependency中的分区器)
private[spark] class ShuffleMapTask(
stageId: Int,
stageAttemptId: Int,
taskBinary: Broadcast[Array[Byte]],
partition: Partition,
@transient private var locs: Seq[TaskLocation],
localProperties: Properties,
serializedTaskMetrics: Array[Byte],
jobId: Option[Int] = None,
appId: Option[String] = None,
appAttemptId: Option[String] = None,
isBarrier: Boolean = false)
extends Task[MapStatus](stageId, stageAttemptId, partition.index, localProperties,
serializedTaskMetrics, jobId, appId, appAttemptId, isBarrier)
runTask 方法
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTimeNs = System.nanoTime()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
val rddAndDep = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
val rdd = rddAndDep._1
val dep = rddAndDep._2
// While we use the old shuffle fetch protocol, we use partitionId as mapId in the
// ShuffleBlockId construction.
val mapId = if (SparkEnv.get.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) {
partitionId
} else context.taskAttemptId()
dep.shuffleWriterProcessor.write(rdd, dep, mapId, context, partition)
}
核心语句是这里的 dep.shuffleWriterProcessor.write(rdd, dep, mapId, context, partition)
ShuffleWriteProcessor
这是Shuffle写阶段的处理类, 核心方法是write, 创建了 ShuffleWriter 对象 (通过shuffleManager的getWriter 方法) 完成写操作, try 语句中的内容如下
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](
dep.shuffleHandle,
mapId,
context,
createMetricsReporter(context))
writer.write(
rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
val mapStatus = writer.stop(success = true)
if (mapStatus.isDefined) {
// Initiate shuffle push process if push based shuffle is enabled
// The map task only takes care of converting the shuffle data file into multiple
// block push requests. It delegates pushing the blocks to a different thread-pool -
// ShuffleBlockPusher.BLOCK_PUSHER_POOL.
if (dep.shuffleMergeEnabled && dep.getMergerLocs.nonEmpty && !dep.shuffleMergeFinalized) {
manager.shuffleBlockResolver match {
case resolver: IndexShuffleBlockResolver =>
val dataFile = resolver.getDataFile(dep.shuffleId, mapId)
new ShuffleBlockPusher(SparkEnv.get.conf)
.initiateBlockPush(dataFile, writer.getPartitionLengths(), dep, partition.index)
case _ =>
}
}
}
mapStatus.get
ShuffleManager 早期还有一个HashShuffleManager的实现,目前新版本中只有一个SortShuffleManager.
该类中的getWriter 通过匹配handle的类型,创建不同的ShuffleterWriter:
unsafeShuffleHandle -> UnsafeShuffleWriter;
bypassMergeSortHandle -> BypassMergeSortShuffleWriter
BaseShuffleHandle -> SortShuffleWriter
这三种handle是通过SortShuffleManager.registerShuffle 注册时创建的:
override def registerShuffle[K, V, C](
shuffleId: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
new BypassMergeSortShuffleHandle[K, V](
shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
// Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
new SerializedShuffleHandle[K, V](
shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else {
// Otherwise, buffer map outputs in a deserialized form:
new BaseShuffleHandle(shuffleId, dependency)
}
注释: 先判断能不能忽略归并排序, 代码注释给出了两个条件.
如果分区数量小于 spark.shuffle.sort.bypassMergeThreshold partitions 并且不需要进行 map-side 聚合, 直接写 ** numPartitions ** 个文件, 然后最终将它们连接起来, 这样避免两次序列化和反序列化. 这样人帮的缺点时会同一时间一次性打开多个文件,造成更多的内存缓冲区的使用.
如果满足这些条件, 就返回一个 Bypass的Handle.
第二层判断, 判断是否使用序列化Shuffle (SerializedShuffle), 通过 canUseSerializedShuffle 方法判断. 要求:
- 序列化框架要能支持重定位(RelocationOfSerializedObject)
- 不能使用mapSideCombine
- 分区数量不能大于 MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE 即 16777216
最后,使用默认的BaseShuffleHandle (没有限制条件)
ShuffleWriter
ShuffleWriter 是抽象类, 有三个继承实现
- SortShuffleWriter
- BypassMergeSortShuffleWriter
- UnsafeSHuffleWriter
三个写对象有什么区别呢?
SortShuffleWriter
// write 方法
/** Write a bunch of records to this task's output */
override def write(records: Iterator[Product2[K, V]]): Unit = {
sorter = if (dep.mapSideCombine) {
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
}
sorter.insertAll(records)
val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(
dep.shuffleId, mapId, dep.partitioner.numPartitions)
sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter)
partitionLengths = mapOutputWriter.commitAllPartitions(sorter.getChecksums).getPartitionLengths
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
}
先创建一个排序器 sorter (sorter是 ExternalSorter 对象) , 使用sorter.insertAll (records) 排序.
核心语句是 sorter的writePartitionedMapOutput, 执行写操作.
补充, 构建sorter时也要判断是否用到了map-side 聚合
-
ExternalSorter.writerPartitionedMapOutput
-
LocalDiskShuffleMapOutputWriter.commitAllPartitions
-
IndexShuffleBlockResolver.writeMetadataFileAndCommit
在本地写磁盘时,要写的metadata包括两种信息, 索引文件和checksum文件, 其中checumSum 是可省略的.
sorter.insertAll
map是PartitionedAppendOnlyMap 一个映射表, 如果开启了预聚合,用map来更新对应的key-value信息. 溢写操作 maybeSpillCollection(usingMap = true)
参数为true.
buffer 是 PartitionedPairBuffer 是个纯数组结构, 只做纯append, 溢写操作参数是false.
...
if (shouldCombine) {
// Combine values in-memory first using our AppendOnlyMap
val mergeValue = aggregator.get.mergeValue
val createCombiner = aggregator.get.createCombiner
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
while (records.hasNext) {
addElementsRead()
kv = records.next()
map.changeValue((getPartition(kv._1), kv._1), update)
maybeSpillCollection(usingMap = true)
}
}
如果没有支持预聚合操作,
....
else {
// Stick values into our buffer
while (records.hasNext) {
addElementsRead()
val kv = records.next()
buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
maybeSpillCollection(usingMap = false)
}
}
sorter.maybeSpillCollection(usingMap: Boolean)
内存缓冲区溢写到磁盘的操作.
true表示map,
estimatedSize = map.estimateSize()
if (maybeSpill(map, estimatedSize)) {
map = new PartitionedAppendOnlyMap[K, C]
false表示是buffer数组.
sorter.maybeSpill(collection C, currentMemory: Long)
判断是否应该要溢写?
元素数量是32的倍数 并且 当前内存大于指定阈值 (SHUFFLE_SPILL_INITIAL_MEM_THRESHOLD 默认取自于配置文件 key为 spark.shuffle.spill.initialMemoryThreshold) 5MB
尝试申请新内存(amountToRequest, acquireMemory)
如果申请资源不足以满足, 开始强制溢写.
另外一种开启溢写的机制是判断读取元素数量太多, 大于
numElementsForceSpillThreshold (SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD) 即整数的最大值, 会强制溢写
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
var shouldSpill = false
if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
val amountToRequest = 2 * currentMemory - myMemoryThreshold
val granted = acquireMemory(amountToRequest)
myMemoryThreshold += granted
// If we were granted too little memory to grow further (either tryToAcquire returned 0,
// or we already had more memory than myMemoryThreshold), spill the current collection
shouldSpill = currentMemory >= myMemoryThreshold
}
shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
// Actually spill
if (shouldSpill) {
_spillCount += 1
logSpillage(currentMemory)
spill(collection)
_elementsRead = 0
_memoryBytesSpilled += currentMemory
releaseMemory()
}
shouldSpill
}
logSpillage(currentMemory)
spill(collection)
执行真正的溢写操作
溢写过后, 调用relaseMemory () 释放内存.
spill(collection) -> spillMemoryIteratorToDisk(inMemoryIterator)
在ExternalSortger中, spill 方法真正调用的是 spillMemoryIteratorToDisk
方法注释, 将内存中的一部分迭代对象,写到磁盘中的一个临时文件.
Spill contents of in-memory iterator to a temporary file on disk.
临时文件创建:
diskBlockManager.createTempShuffleBlock()
溢写的写对象, writer:
val writer: DiskBlockObjectWriter = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics)
fileBufferSize 大小是 SHUFFLE_FILE_BUFFER_SIZE 32K
(spark.shuffle.file.buffer)
sorter.partitionedIterator
最后返回所有的数据,既包括磁盘的溢写临时文件,也包括内存缓冲区中的文件
BypassMergeSortShuffleWriter
ShuffleRead阶段
想象Reduce是一个结果Stage, 那么通过ResultStage就应该可以找到读数据的逻辑 , ResultTask.runTask中有这么一段:
func(context, rdd.iterator(partition, context))
调用RDD的iterator 方法, 继而调用 RDD.getOrCompute -- computeOrReadCheckpoint -- compute 方法
因为要看的是Shuffle过程, 所以去查看ShuffleRDD的compute方法.
可以看到, SparkEnv.get.shuffleManager.getReader
这一行生成了Reader,并执行read方法.