diff options
author | Kay Ousterhout <kayousterhout@gmail.com> | 2015-05-08 12:24:06 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2015-05-08 12:24:06 -0700 |
commit | 4b3bb0e43ca7e1a27308516608419487b6a844e6 (patch) | |
tree | 07327d1d8cf2c8a8bc34217c706d3d761e61c31e /core | |
parent | 2d05f325dc3c70349bd17ed399897f22d967c687 (diff) | |
download | spark-4b3bb0e43ca7e1a27308516608419487b6a844e6.tar.gz spark-4b3bb0e43ca7e1a27308516608419487b6a844e6.tar.bz2 spark-4b3bb0e43ca7e1a27308516608419487b6a844e6.zip |
[SPARK-6627] Finished rename to ShuffleBlockResolver
The previous cleanup-commit for SPARK-6627 renamed ShuffleBlockManager
to ShuffleBlockResolver, but didn't rename the associated subclasses and
variables; this commit does that.
I'm unsure whether it's ok to rename ExternalShuffleBlockManager, since that's technically a public class?
cc pwendell
Author: Kay Ousterhout <kayousterhout@gmail.com>
Closes #5764 from kayousterhout/SPARK-6627 and squashes the following commits:
43add1e [Kay Ousterhout] Spacing fix
96080bf [Kay Ousterhout] Test fixes
d8a5d36 [Kay Ousterhout] [SPARK-6627] Finished rename to ShuffleBlockResolver
Diffstat (limited to 'core')
10 files changed, 36 insertions, 40 deletions
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala index e9b4e2b955..6ad427bcac 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala @@ -18,7 +18,6 @@ package org.apache.spark.shuffle import java.io.File -import java.nio.ByteBuffer import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicInteger @@ -29,7 +28,7 @@ import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.serializer.Serializer -import org.apache.spark.shuffle.FileShuffleBlockManager.ShuffleFileGroup +import org.apache.spark.shuffle.FileShuffleBlockResolver.ShuffleFileGroup import org.apache.spark.storage._ import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap} import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector} @@ -64,9 +63,8 @@ private[spark] trait ShuffleWriterGroup { * files within a ShuffleFileGroups associated with the block's reducer. */ // Note: Changes to the format in this file should be kept in sync with -// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getHashBasedShuffleBlockData(). -private[spark] -class FileShuffleBlockManager(conf: SparkConf) +// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getHashBasedShuffleBlockData(). +private[spark] class FileShuffleBlockResolver(conf: SparkConf) extends ShuffleBlockResolver with Logging { private val transportConf = SparkTransportConf.fromSparkConf(conf) @@ -242,8 +240,7 @@ class FileShuffleBlockManager(conf: SparkConf) } } -private[spark] -object FileShuffleBlockManager { +private[spark] object FileShuffleBlockResolver { /** * A group of shuffle files, one per reducer. * A particular mapper will be assigned a single ShuffleFileGroup to write its output to. diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index a1741e2875..d9c63b6e7b 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -18,7 +18,6 @@ package org.apache.spark.shuffle import java.io._ -import java.nio.ByteBuffer import com.google.common.io.ByteStreams @@ -28,7 +27,7 @@ import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.storage._ import org.apache.spark.util.Utils -import IndexShuffleBlockManager.NOOP_REDUCE_ID +import IndexShuffleBlockResolver.NOOP_REDUCE_ID /** * Create and maintain the shuffle blocks' mapping between logic block and physical file location. @@ -40,9 +39,8 @@ import IndexShuffleBlockManager.NOOP_REDUCE_ID * */ // Note: Changes to the format in this file should be kept in sync with -// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getSortBasedShuffleBlockData(). -private[spark] -class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockResolver { +// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getSortBasedShuffleBlockData(). +private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleBlockResolver { private lazy val blockManager = SparkEnv.get.blockManager @@ -115,7 +113,7 @@ class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockResolver { override def stop(): Unit = {} } -private[spark] object IndexShuffleBlockManager { +private[spark] object IndexShuffleBlockResolver { // No-op reduce ID used in interactions with disk store and BlockObjectWriter. // The disk store currently expects puts to relate to a (map, reduce) pair, but in the sort // shuffle outputs for several reduces are glommed into a single file. diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala index 2a7df8dd5b..c089088f40 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala @@ -26,7 +26,7 @@ import org.apache.spark.shuffle._ */ private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager { - private val fileShuffleBlockManager = new FileShuffleBlockManager(conf) + private val fileShuffleBlockResolver = new FileShuffleBlockResolver(conf) /* Register a shuffle with the manager and obtain a handle for it to pass to tasks. */ override def registerShuffle[K, V, C]( @@ -61,8 +61,8 @@ private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager shuffleBlockResolver.removeShuffle(shuffleId) } - override def shuffleBlockResolver: FileShuffleBlockManager = { - fileShuffleBlockManager + override def shuffleBlockResolver: FileShuffleBlockResolver = { + fileShuffleBlockResolver } /** Shut down this ShuffleManager. */ diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala index cd27c9e07a..897f0a5dc5 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala @@ -25,7 +25,7 @@ import org.apache.spark.shuffle._ import org.apache.spark.storage.BlockObjectWriter private[spark] class HashShuffleWriter[K, V]( - shuffleBlockManager: FileShuffleBlockManager, + shuffleBlockResolver: FileShuffleBlockResolver, handle: BaseShuffleHandle[K, V, _], mapId: Int, context: TaskContext) @@ -45,7 +45,7 @@ private[spark] class HashShuffleWriter[K, V]( private val blockManager = SparkEnv.get.blockManager private val ser = Serializer.getSerializer(dep.serializer.getOrElse(null)) - private val shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser, + private val shuffle = shuffleBlockResolver.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser, writeMetrics) /** Write a bunch of records to this task's output */ diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index 0497036192..15842941da 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -25,7 +25,7 @@ import org.apache.spark.shuffle.hash.HashShuffleReader private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager { - private val indexShuffleBlockManager = new IndexShuffleBlockManager(conf) + private val indexShuffleBlockResolver = new IndexShuffleBlockResolver(conf) private val shuffleMapNumber = new ConcurrentHashMap[Int, Int]() /** @@ -72,8 +72,8 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager true } - override def shuffleBlockResolver: IndexShuffleBlockManager = { - indexShuffleBlockManager + override def shuffleBlockResolver: IndexShuffleBlockResolver = { + indexShuffleBlockResolver } /** Shut down this ShuffleManager. */ diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index a066435df6..add2656294 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -20,12 +20,12 @@ package org.apache.spark.shuffle.sort import org.apache.spark.{MapOutputTracker, SparkEnv, Logging, TaskContext} import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.scheduler.MapStatus -import org.apache.spark.shuffle.{IndexShuffleBlockManager, ShuffleWriter, BaseShuffleHandle} +import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleWriter, BaseShuffleHandle} import org.apache.spark.storage.ShuffleBlockId import org.apache.spark.util.collection.ExternalSorter private[spark] class SortShuffleWriter[K, V, C]( - shuffleBlockManager: IndexShuffleBlockManager, + shuffleBlockResolver: IndexShuffleBlockResolver, handle: BaseShuffleHandle[K, V, C], mapId: Int, context: TaskContext) @@ -65,10 +65,10 @@ private[spark] class SortShuffleWriter[K, V, C]( // Don't bother including the time to open the merged output file in the shuffle write time, // because it just opens a single file, so is typically too fast to measure accurately // (see SPARK-3570). - val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId) - val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockManager.NOOP_REDUCE_ID) + val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) + val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile) - shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths) + shuffleBlockResolver.writeIndexFile(dep.shuffleId, mapId, partitionLengths) mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) } @@ -84,7 +84,7 @@ private[spark] class SortShuffleWriter[K, V, C]( return Option(mapStatus) } else { // The map task failed, so delete our output data. - shuffleBlockManager.removeDataByMap(dep.shuffleId, mapId) + shuffleBlockResolver.removeDataByMap(dep.shuffleId, mapId) return None } } finally { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index c186fd360f..524f697099 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -54,7 +54,7 @@ case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId { } // Format of the shuffle block ids (including data and index) should be kept in sync with -// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getBlockData(). +// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getBlockData(). @DeveloperApi case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId { override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index a46fecd227..cc794e5c90 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -431,10 +431,11 @@ private[spark] class BlockManager( // As an optimization for map output fetches, if the block is for a shuffle, return it // without acquiring a lock; the disk store never deletes (recent) items so this should work if (blockId.isShuffle) { - val shuffleBlockManager = shuffleManager.shuffleBlockResolver + val shuffleBlockResolver = shuffleManager.shuffleBlockResolver // TODO: This should gracefully handle case where local block is not available. Currently // downstream code will throw an exception. - Option(shuffleBlockManager.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer()) + Option( + shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer()) } else { doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]] } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 5764c16902..2a4447705f 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -55,7 +55,7 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon /** Looks up a file by hashing it into one of our local subdirectories. */ // This method should be kept in sync with - // org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getFile(). + // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile(). def getFile(filename: String): File = { // Figure out which local directory it hashes to, and which subdirectory in that val hash = Utils.nonNegativeHash(filename) diff --git a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala index 84384bb489..0537bf66ad 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.{SparkEnv, SparkContext, LocalSparkContext, SparkConf} import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.serializer.JavaSerializer -import org.apache.spark.shuffle.FileShuffleBlockManager +import org.apache.spark.shuffle.FileShuffleBlockResolver import org.apache.spark.storage.{ShuffleBlockId, FileSegment} class HashShuffleManagerSuite extends FunSuite with LocalSparkContext { @@ -53,10 +53,10 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext { sc = new SparkContext("local", "test", conf) - val shuffleBlockManager = - SparkEnv.get.shuffleManager.shuffleBlockResolver.asInstanceOf[FileShuffleBlockManager] + val shuffleBlockResolver = + SparkEnv.get.shuffleManager.shuffleBlockResolver.asInstanceOf[FileShuffleBlockResolver] - val shuffle1 = shuffleBlockManager.forMapTask(1, 1, 1, new JavaSerializer(conf), + val shuffle1 = shuffleBlockResolver.forMapTask(1, 1, 1, new JavaSerializer(conf), new ShuffleWriteMetrics) for (writer <- shuffle1.writers) { writer.write("test1", "value") @@ -69,7 +69,7 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext { val shuffle1Segment = shuffle1.writers(0).fileSegment() shuffle1.releaseWriters(success = true) - val shuffle2 = shuffleBlockManager.forMapTask(1, 2, 1, new JavaSerializer(conf), + val shuffle2 = shuffleBlockResolver.forMapTask(1, 2, 1, new JavaSerializer(conf), new ShuffleWriteMetrics) for (writer <- shuffle2.writers) { @@ -88,7 +88,7 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext { // of block based on remaining data in file : which could mess things up when there is // concurrent read and writes happening to the same shuffle group. - val shuffle3 = shuffleBlockManager.forMapTask(1, 3, 1, new JavaSerializer(testConf), + val shuffle3 = shuffleBlockResolver.forMapTask(1, 3, 1, new JavaSerializer(testConf), new ShuffleWriteMetrics) for (writer <- shuffle3.writers) { writer.write("test3", "value") @@ -98,10 +98,10 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext { writer.commitAndClose() } // check before we register. - checkSegments(shuffle2Segment, shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0))) + checkSegments(shuffle2Segment, shuffleBlockResolver.getBlockData(ShuffleBlockId(1, 2, 0))) shuffle3.releaseWriters(success = true) - checkSegments(shuffle2Segment, shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0))) - shuffleBlockManager.removeShuffle(1) + checkSegments(shuffle2Segment, shuffleBlockResolver.getBlockData(ShuffleBlockId(1, 2, 0))) + shuffleBlockResolver.removeShuffle(1) } def writeToFile(file: File, numBytes: Int) { |