aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-09-18 13:48:41 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-09-18 13:48:41 -0700
commit348d7c9a93dd00d3d1859342a8eb0aea2e77f597 (patch)
tree95d9a65d2e95aedd3eb5f31ac41e779e79aa8d6d /core
parent3a22b1004f527d54d399dd0225cd7f2f8ffad9c5 (diff)
downloadspark-348d7c9a93dd00d3d1859342a8eb0aea2e77f597.tar.gz
spark-348d7c9a93dd00d3d1859342a8eb0aea2e77f597.tar.bz2
spark-348d7c9a93dd00d3d1859342a8eb0aea2e77f597.zip
[SPARK-9808] Remove hash shuffle file consolidation.
Author: Reynold Xin <rxin@databricks.com> Closes #8812 from rxin/SPARK-9808-1.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala178
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskStore.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala110
4 files changed, 13 insertions, 287 deletions
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
index c057de9b3f..d9902f96df 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
@@ -17,9 +17,7 @@
package org.apache.spark.shuffle
-import java.io.File
import java.util.concurrent.ConcurrentLinkedQueue
-import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConverters._
@@ -28,10 +26,8 @@ import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.serializer.Serializer
-import org.apache.spark.shuffle.FileShuffleBlockResolver.ShuffleFileGroup
import org.apache.spark.storage._
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
-import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
/** A group of writers for a ShuffleMapTask, one writer per reducer. */
private[spark] trait ShuffleWriterGroup {
@@ -43,24 +39,7 @@ private[spark] trait ShuffleWriterGroup {
/**
* Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one file
- * per reducer (this set of files is called a ShuffleFileGroup).
- *
- * As an optimization to reduce the number of physical shuffle files produced, multiple shuffle
- * blocks are aggregated into the same file. There is one "combined shuffle file" per reducer
- * per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle
- * files, it releases them for another task.
- * Regarding the implementation of this feature, shuffle files are identified by a 3-tuple:
- * - shuffleId: The unique id given to the entire shuffle stage.
- * - bucketId: The id of the output partition (i.e., reducer id)
- * - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a
- * time owns a particular fileId, and this id is returned to a pool when the task finishes.
- * Each shuffle file is then mapped to a FileSegment, which is a 3-tuple (file, offset, length)
- * that specifies where in a given file the actual block data is located.
- *
- * Shuffle file metadata is stored in a space-efficient manner. Rather than simply mapping
- * ShuffleBlockIds directly to FileSegments, each ShuffleFileGroup maintains a list of offsets for
- * each block stored in each file. In order to find the location of a shuffle block, we search the
- * files within a ShuffleFileGroups associated with the block's reducer.
+ * per reducer.
*/
// Note: Changes to the format in this file should be kept in sync with
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getHashBasedShuffleBlockData().
@@ -71,26 +50,15 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
private lazy val blockManager = SparkEnv.get.blockManager
- // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
- // TODO: Remove this once the shuffle file consolidation feature is stable.
- private val consolidateShuffleFiles =
- conf.getBoolean("spark.shuffle.consolidateFiles", false)
-
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
private val bufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024
/**
- * Contains all the state related to a particular shuffle. This includes a pool of unused
- * ShuffleFileGroups, as well as all ShuffleFileGroups that have been created for the shuffle.
+ * Contains all the state related to a particular shuffle.
*/
- private class ShuffleState(val numBuckets: Int) {
- val nextFileId = new AtomicInteger(0)
- val unusedFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
- val allFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
-
+ private class ShuffleState(val numReducers: Int) {
/**
* The mapIds of all map tasks completed on this Executor for this shuffle.
- * NB: This is only populated if consolidateShuffleFiles is FALSE. We don't need it otherwise.
*/
val completedMapTasks = new ConcurrentLinkedQueue[Int]()
}
@@ -104,24 +72,16 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
* Get a ShuffleWriterGroup for the given map task, which will register it as complete
* when the writers are closed successfully
*/
- def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer,
+ def forMapTask(shuffleId: Int, mapId: Int, numReducers: Int, serializer: Serializer,
writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = {
new ShuffleWriterGroup {
- shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))
+ shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numReducers))
private val shuffleState = shuffleStates(shuffleId)
- private var fileGroup: ShuffleFileGroup = null
val openStartTime = System.nanoTime
val serializerInstance = serializer.newInstance()
- val writers: Array[DiskBlockObjectWriter] = if (consolidateShuffleFiles) {
- fileGroup = getUnusedFileGroup()
- Array.tabulate[DiskBlockObjectWriter](numBuckets) { bucketId =>
- val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
- blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializerInstance, bufferSize,
- writeMetrics)
- }
- } else {
- Array.tabulate[DiskBlockObjectWriter](numBuckets) { bucketId =>
+ val writers: Array[DiskBlockObjectWriter] = {
+ Array.tabulate[DiskBlockObjectWriter](numReducers) { bucketId =>
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
val blockFile = blockManager.diskBlockManager.getFile(blockId)
// Because of previous failures, the shuffle file may already exist on this machine.
@@ -142,58 +102,14 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime)
override def releaseWriters(success: Boolean) {
- if (consolidateShuffleFiles) {
- if (success) {
- val offsets = writers.map(_.fileSegment().offset)
- val lengths = writers.map(_.fileSegment().length)
- fileGroup.recordMapOutput(mapId, offsets, lengths)
- }
- recycleFileGroup(fileGroup)
- } else {
- shuffleState.completedMapTasks.add(mapId)
- }
- }
-
- private def getUnusedFileGroup(): ShuffleFileGroup = {
- val fileGroup = shuffleState.unusedFileGroups.poll()
- if (fileGroup != null) fileGroup else newFileGroup()
- }
-
- private def newFileGroup(): ShuffleFileGroup = {
- val fileId = shuffleState.nextFileId.getAndIncrement()
- val files = Array.tabulate[File](numBuckets) { bucketId =>
- val filename = physicalFileName(shuffleId, bucketId, fileId)
- blockManager.diskBlockManager.getFile(filename)
- }
- val fileGroup = new ShuffleFileGroup(shuffleId, fileId, files)
- shuffleState.allFileGroups.add(fileGroup)
- fileGroup
- }
-
- private def recycleFileGroup(group: ShuffleFileGroup) {
- shuffleState.unusedFileGroups.add(group)
+ shuffleState.completedMapTasks.add(mapId)
}
}
}
override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
- if (consolidateShuffleFiles) {
- // Search all file groups associated with this shuffle.
- val shuffleState = shuffleStates(blockId.shuffleId)
- val iter = shuffleState.allFileGroups.iterator
- while (iter.hasNext) {
- val segmentOpt = iter.next.getFileSegmentFor(blockId.mapId, blockId.reduceId)
- if (segmentOpt.isDefined) {
- val segment = segmentOpt.get
- return new FileSegmentManagedBuffer(
- transportConf, segment.file, segment.offset, segment.length)
- }
- }
- throw new IllegalStateException("Failed to find shuffle block: " + blockId)
- } else {
- val file = blockManager.diskBlockManager.getFile(blockId)
- new FileSegmentManagedBuffer(transportConf, file, 0, file.length)
- }
+ val file = blockManager.diskBlockManager.getFile(blockId)
+ new FileSegmentManagedBuffer(transportConf, file, 0, file.length)
}
/** Remove all the blocks / files and metadata related to a particular shuffle. */
@@ -209,17 +125,9 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
private def removeShuffleBlocks(shuffleId: ShuffleId): Boolean = {
shuffleStates.get(shuffleId) match {
case Some(state) =>
- if (consolidateShuffleFiles) {
- for (fileGroup <- state.allFileGroups.asScala;
- file <- fileGroup.files) {
- file.delete()
- }
- } else {
- for (mapId <- state.completedMapTasks.asScala;
- reduceId <- 0 until state.numBuckets) {
- val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId)
- blockManager.diskBlockManager.getFile(blockId).delete()
- }
+ for (mapId <- state.completedMapTasks.asScala; reduceId <- 0 until state.numReducers) {
+ val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId)
+ blockManager.diskBlockManager.getFile(blockId).delete()
}
logInfo("Deleted all files for shuffle " + shuffleId)
true
@@ -229,10 +137,6 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
}
}
- private def physicalFileName(shuffleId: Int, bucketId: Int, fileId: Int) = {
- "merged_shuffle_%d_%d_%d".format(shuffleId, bucketId, fileId)
- }
-
private def cleanup(cleanupTime: Long) {
shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => removeShuffleBlocks(shuffleId))
}
@@ -241,59 +145,3 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
metadataCleaner.cancel()
}
}
-
-private[spark] object FileShuffleBlockResolver {
- /**
- * A group of shuffle files, one per reducer.
- * A particular mapper will be assigned a single ShuffleFileGroup to write its output to.
- */
- private class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[File]) {
- private var numBlocks: Int = 0
-
- /**
- * Stores the absolute index of each mapId in the files of this group. For instance,
- * if mapId 5 is the first block in each file, mapIdToIndex(5) = 0.
- */
- private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()
-
- /**
- * Stores consecutive offsets and lengths of blocks into each reducer file, ordered by
- * position in the file.
- * Note: mapIdToIndex(mapId) returns the index of the mapper into the vector for every
- * reducer.
- */
- private val blockOffsetsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
- new PrimitiveVector[Long]()
- }
- private val blockLengthsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
- new PrimitiveVector[Long]()
- }
-
- def apply(bucketId: Int): File = files(bucketId)
-
- def recordMapOutput(mapId: Int, offsets: Array[Long], lengths: Array[Long]) {
- assert(offsets.length == lengths.length)
- mapIdToIndex(mapId) = numBlocks
- numBlocks += 1
- for (i <- 0 until offsets.length) {
- blockOffsetsByReducer(i) += offsets(i)
- blockLengthsByReducer(i) += lengths(i)
- }
- }
-
- /** Returns the FileSegment associated with the given map task, or None if no entry exists. */
- def getFileSegmentFor(mapId: Int, reducerId: Int): Option[FileSegment] = {
- val file = files(reducerId)
- val blockOffsets = blockOffsetsByReducer(reducerId)
- val blockLengths = blockLengthsByReducer(reducerId)
- val index = mapIdToIndex.getOrElse(mapId, -1)
- if (index >= 0) {
- val offset = blockOffsets(index)
- val length = blockLengths(index)
- Some(new FileSegment(file, offset, length))
- } else {
- None
- }
- }
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index d31aa68eb6..bca3942f8c 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -106,15 +106,6 @@ private[spark] class BlockManager(
}
}
- // Check that we're not using external shuffle service with consolidated shuffle files.
- if (externalShuffleServiceEnabled
- && conf.getBoolean("spark.shuffle.consolidateFiles", false)
- && shuffleManager.isInstanceOf[HashShuffleManager]) {
- throw new UnsupportedOperationException("Cannot use external shuffle service with consolidated"
- + " shuffle files in hash-based shuffle. Please disable spark.shuffle.consolidateFiles or "
- + " switch to sort-based shuffle.")
- }
-
var blockManagerId: BlockManagerId = _
// Address of the server that serves this executor's shuffle files. This is either an external
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index 1f45956282..feb9533604 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -154,9 +154,6 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
override def remove(blockId: BlockId): Boolean = {
val file = diskManager.getFile(blockId.name)
- // If consolidation mode is used With HashShuffleMananger, the physical filename for the block
- // is different from blockId.name. So the file returns here will not be exist, thus we avoid to
- // delete the whole consolidated file by mistake.
if (file.exists()) {
file.delete()
} else {
diff --git a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
deleted file mode 100644
index 491dc3659e..0000000000
--- a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.shuffle.hash
-
-import java.io.{File, FileWriter}
-
-import scala.language.reflectiveCalls
-
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv, SparkFunSuite}
-import org.apache.spark.executor.ShuffleWriteMetrics
-import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
-import org.apache.spark.serializer.JavaSerializer
-import org.apache.spark.shuffle.FileShuffleBlockResolver
-import org.apache.spark.storage.{ShuffleBlockId, FileSegment}
-
-class HashShuffleManagerSuite extends SparkFunSuite with LocalSparkContext {
- private val testConf = new SparkConf(false)
-
- private def checkSegments(expected: FileSegment, buffer: ManagedBuffer) {
- assert(buffer.isInstanceOf[FileSegmentManagedBuffer])
- val segment = buffer.asInstanceOf[FileSegmentManagedBuffer]
- assert(expected.file.getCanonicalPath === segment.getFile.getCanonicalPath)
- assert(expected.offset === segment.getOffset)
- assert(expected.length === segment.getLength)
- }
-
- test("consolidated shuffle can write to shuffle group without messing existing offsets/lengths") {
-
- val conf = new SparkConf(false)
- // reset after EACH object write. This is to ensure that there are bytes appended after
- // an object is written. So if the codepaths assume writeObject is end of data, this should
- // flush those bugs out. This was common bug in ExternalAppendOnlyMap, etc.
- conf.set("spark.serializer.objectStreamReset", "1")
- conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
- conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.hash.HashShuffleManager")
-
- sc = new SparkContext("local", "test", conf)
-
- val shuffleBlockResolver =
- SparkEnv.get.shuffleManager.shuffleBlockResolver.asInstanceOf[FileShuffleBlockResolver]
-
- val shuffle1 = shuffleBlockResolver.forMapTask(1, 1, 1, new JavaSerializer(conf),
- new ShuffleWriteMetrics)
- for (writer <- shuffle1.writers) {
- writer.write("test1", "value")
- writer.write("test2", "value")
- }
- for (writer <- shuffle1.writers) {
- writer.commitAndClose()
- }
-
- val shuffle1Segment = shuffle1.writers(0).fileSegment()
- shuffle1.releaseWriters(success = true)
-
- val shuffle2 = shuffleBlockResolver.forMapTask(1, 2, 1, new JavaSerializer(conf),
- new ShuffleWriteMetrics)
-
- for (writer <- shuffle2.writers) {
- writer.write("test3", "value")
- writer.write("test4", "vlue")
- }
- for (writer <- shuffle2.writers) {
- writer.commitAndClose()
- }
- val shuffle2Segment = shuffle2.writers(0).fileSegment()
- shuffle2.releaseWriters(success = true)
-
- // Now comes the test :
- // Write to shuffle 3; and close it, but before registering it, check if the file lengths for
- // previous task (forof shuffle1) is the same as 'segments'. Earlier, we were inferring length
- // of block based on remaining data in file : which could mess things up when there is
- // concurrent read and writes happening to the same shuffle group.
-
- val shuffle3 = shuffleBlockResolver.forMapTask(1, 3, 1, new JavaSerializer(testConf),
- new ShuffleWriteMetrics)
- for (writer <- shuffle3.writers) {
- writer.write("test3", "value")
- writer.write("test4", "value")
- }
- for (writer <- shuffle3.writers) {
- writer.commitAndClose()
- }
- // check before we register.
- checkSegments(shuffle2Segment, shuffleBlockResolver.getBlockData(ShuffleBlockId(1, 2, 0)))
- shuffle3.releaseWriters(success = true)
- checkSegments(shuffle2Segment, shuffleBlockResolver.getBlockData(ShuffleBlockId(1, 2, 0)))
- shuffleBlockResolver.removeShuffle(1)
- }
-
- def writeToFile(file: File, numBytes: Int) {
- val writer = new FileWriter(file, true)
- for (i <- 0 until numBytes) writer.write(i)
- writer.close()
- }
-}