aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2015-05-08 12:24:06 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-05-08 12:24:06 -0700
commit4b3bb0e43ca7e1a27308516608419487b6a844e6 (patch)
tree07327d1d8cf2c8a8bc34217c706d3d761e61c31e
parent2d05f325dc3c70349bd17ed399897f22d967c687 (diff)
downloadspark-4b3bb0e43ca7e1a27308516608419487b6a844e6.tar.gz
spark-4b3bb0e43ca7e1a27308516608419487b6a844e6.tar.bz2
spark-4b3bb0e43ca7e1a27308516608419487b6a844e6.zip
[SPARK-6627] Finished rename to ShuffleBlockResolver
The previous cleanup-commit for SPARK-6627 renamed ShuffleBlockManager to ShuffleBlockResolver, but didn't rename the associated subclasses and variables; this commit does that. I'm unsure whether it's ok to rename ExternalShuffleBlockManager, since that's technically a public class? cc pwendell Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #5764 from kayousterhout/SPARK-6627 and squashes the following commits: 43add1e [Kay Ousterhout] Spacing fix 96080bf [Kay Ousterhout] Test fixes d8a5d36 [Kay Ousterhout] [SPARK-6627] Finished rename to ShuffleBlockResolver
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala (renamed from core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala)11
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala (renamed from core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala)10
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockId.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala18
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java6
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java (renamed from network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java)16
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java16
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java (renamed from network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java)30
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java37
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java8
16 files changed, 94 insertions, 95 deletions
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
index e9b4e2b955..6ad427bcac 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
@@ -18,7 +18,6 @@
package org.apache.spark.shuffle
import java.io.File
-import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger
@@ -29,7 +28,7 @@ import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.serializer.Serializer
-import org.apache.spark.shuffle.FileShuffleBlockManager.ShuffleFileGroup
+import org.apache.spark.shuffle.FileShuffleBlockResolver.ShuffleFileGroup
import org.apache.spark.storage._
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
@@ -64,9 +63,8 @@ private[spark] trait ShuffleWriterGroup {
* files within a ShuffleFileGroups associated with the block's reducer.
*/
// Note: Changes to the format in this file should be kept in sync with
-// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getHashBasedShuffleBlockData().
-private[spark]
-class FileShuffleBlockManager(conf: SparkConf)
+// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getHashBasedShuffleBlockData().
+private[spark] class FileShuffleBlockResolver(conf: SparkConf)
extends ShuffleBlockResolver with Logging {
private val transportConf = SparkTransportConf.fromSparkConf(conf)
@@ -242,8 +240,7 @@ class FileShuffleBlockManager(conf: SparkConf)
}
}
-private[spark]
-object FileShuffleBlockManager {
+private[spark] object FileShuffleBlockResolver {
/**
* A group of shuffle files, one per reducer.
* A particular mapper will be assigned a single ShuffleFileGroup to write its output to.
diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
index a1741e2875..d9c63b6e7b 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -18,7 +18,6 @@
package org.apache.spark.shuffle
import java.io._
-import java.nio.ByteBuffer
import com.google.common.io.ByteStreams
@@ -28,7 +27,7 @@ import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.storage._
import org.apache.spark.util.Utils
-import IndexShuffleBlockManager.NOOP_REDUCE_ID
+import IndexShuffleBlockResolver.NOOP_REDUCE_ID
/**
* Create and maintain the shuffle blocks' mapping between logic block and physical file location.
@@ -40,9 +39,8 @@ import IndexShuffleBlockManager.NOOP_REDUCE_ID
*
*/
// Note: Changes to the format in this file should be kept in sync with
-// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getSortBasedShuffleBlockData().
-private[spark]
-class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockResolver {
+// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getSortBasedShuffleBlockData().
+private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleBlockResolver {
private lazy val blockManager = SparkEnv.get.blockManager
@@ -115,7 +113,7 @@ class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockResolver {
override def stop(): Unit = {}
}
-private[spark] object IndexShuffleBlockManager {
+private[spark] object IndexShuffleBlockResolver {
// No-op reduce ID used in interactions with disk store and BlockObjectWriter.
// The disk store currently expects puts to relate to a (map, reduce) pair, but in the sort
// shuffle outputs for several reduces are glommed into a single file.
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
index 2a7df8dd5b..c089088f40 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
@@ -26,7 +26,7 @@ import org.apache.spark.shuffle._
*/
private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager {
- private val fileShuffleBlockManager = new FileShuffleBlockManager(conf)
+ private val fileShuffleBlockResolver = new FileShuffleBlockResolver(conf)
/* Register a shuffle with the manager and obtain a handle for it to pass to tasks. */
override def registerShuffle[K, V, C](
@@ -61,8 +61,8 @@ private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager
shuffleBlockResolver.removeShuffle(shuffleId)
}
- override def shuffleBlockResolver: FileShuffleBlockManager = {
- fileShuffleBlockManager
+ override def shuffleBlockResolver: FileShuffleBlockResolver = {
+ fileShuffleBlockResolver
}
/** Shut down this ShuffleManager. */
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
index cd27c9e07a..897f0a5dc5 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
@@ -25,7 +25,7 @@ import org.apache.spark.shuffle._
import org.apache.spark.storage.BlockObjectWriter
private[spark] class HashShuffleWriter[K, V](
- shuffleBlockManager: FileShuffleBlockManager,
+ shuffleBlockResolver: FileShuffleBlockResolver,
handle: BaseShuffleHandle[K, V, _],
mapId: Int,
context: TaskContext)
@@ -45,7 +45,7 @@ private[spark] class HashShuffleWriter[K, V](
private val blockManager = SparkEnv.get.blockManager
private val ser = Serializer.getSerializer(dep.serializer.getOrElse(null))
- private val shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser,
+ private val shuffle = shuffleBlockResolver.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser,
writeMetrics)
/** Write a bunch of records to this task's output */
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
index 0497036192..15842941da 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
@@ -25,7 +25,7 @@ import org.apache.spark.shuffle.hash.HashShuffleReader
private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager {
- private val indexShuffleBlockManager = new IndexShuffleBlockManager(conf)
+ private val indexShuffleBlockResolver = new IndexShuffleBlockResolver(conf)
private val shuffleMapNumber = new ConcurrentHashMap[Int, Int]()
/**
@@ -72,8 +72,8 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
true
}
- override def shuffleBlockResolver: IndexShuffleBlockManager = {
- indexShuffleBlockManager
+ override def shuffleBlockResolver: IndexShuffleBlockResolver = {
+ indexShuffleBlockResolver
}
/** Shut down this ShuffleManager. */
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index a066435df6..add2656294 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -20,12 +20,12 @@ package org.apache.spark.shuffle.sort
import org.apache.spark.{MapOutputTracker, SparkEnv, Logging, TaskContext}
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.scheduler.MapStatus
-import org.apache.spark.shuffle.{IndexShuffleBlockManager, ShuffleWriter, BaseShuffleHandle}
+import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleWriter, BaseShuffleHandle}
import org.apache.spark.storage.ShuffleBlockId
import org.apache.spark.util.collection.ExternalSorter
private[spark] class SortShuffleWriter[K, V, C](
- shuffleBlockManager: IndexShuffleBlockManager,
+ shuffleBlockResolver: IndexShuffleBlockResolver,
handle: BaseShuffleHandle[K, V, C],
mapId: Int,
context: TaskContext)
@@ -65,10 +65,10 @@ private[spark] class SortShuffleWriter[K, V, C](
// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
- val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId)
- val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockManager.NOOP_REDUCE_ID)
+ val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
+ val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
- shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths)
+ shuffleBlockResolver.writeIndexFile(dep.shuffleId, mapId, partitionLengths)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
}
@@ -84,7 +84,7 @@ private[spark] class SortShuffleWriter[K, V, C](
return Option(mapStatus)
} else {
// The map task failed, so delete our output data.
- shuffleBlockManager.removeDataByMap(dep.shuffleId, mapId)
+ shuffleBlockResolver.removeDataByMap(dep.shuffleId, mapId)
return None
}
} finally {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
index c186fd360f..524f697099 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
@@ -54,7 +54,7 @@ case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
}
// Format of the shuffle block ids (including data and index) should be kept in sync with
-// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getBlockData().
+// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getBlockData().
@DeveloperApi
case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index a46fecd227..cc794e5c90 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -431,10 +431,11 @@ private[spark] class BlockManager(
// As an optimization for map output fetches, if the block is for a shuffle, return it
// without acquiring a lock; the disk store never deletes (recent) items so this should work
if (blockId.isShuffle) {
- val shuffleBlockManager = shuffleManager.shuffleBlockResolver
+ val shuffleBlockResolver = shuffleManager.shuffleBlockResolver
// TODO: This should gracefully handle case where local block is not available. Currently
// downstream code will throw an exception.
- Option(shuffleBlockManager.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
+ Option(
+ shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
} else {
doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
}
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 5764c16902..2a4447705f 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -55,7 +55,7 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
/** Looks up a file by hashing it into one of our local subdirectories. */
// This method should be kept in sync with
- // org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getFile().
+ // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
def getFile(filename: String): File = {
// Figure out which local directory it hashes to, and which subdirectory in that
val hash = Utils.nonNegativeHash(filename)
diff --git a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
index 84384bb489..0537bf66ad 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.{SparkEnv, SparkContext, LocalSparkContext, SparkConf}
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.serializer.JavaSerializer
-import org.apache.spark.shuffle.FileShuffleBlockManager
+import org.apache.spark.shuffle.FileShuffleBlockResolver
import org.apache.spark.storage.{ShuffleBlockId, FileSegment}
class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
@@ -53,10 +53,10 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
sc = new SparkContext("local", "test", conf)
- val shuffleBlockManager =
- SparkEnv.get.shuffleManager.shuffleBlockResolver.asInstanceOf[FileShuffleBlockManager]
+ val shuffleBlockResolver =
+ SparkEnv.get.shuffleManager.shuffleBlockResolver.asInstanceOf[FileShuffleBlockResolver]
- val shuffle1 = shuffleBlockManager.forMapTask(1, 1, 1, new JavaSerializer(conf),
+ val shuffle1 = shuffleBlockResolver.forMapTask(1, 1, 1, new JavaSerializer(conf),
new ShuffleWriteMetrics)
for (writer <- shuffle1.writers) {
writer.write("test1", "value")
@@ -69,7 +69,7 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
val shuffle1Segment = shuffle1.writers(0).fileSegment()
shuffle1.releaseWriters(success = true)
- val shuffle2 = shuffleBlockManager.forMapTask(1, 2, 1, new JavaSerializer(conf),
+ val shuffle2 = shuffleBlockResolver.forMapTask(1, 2, 1, new JavaSerializer(conf),
new ShuffleWriteMetrics)
for (writer <- shuffle2.writers) {
@@ -88,7 +88,7 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
// of block based on remaining data in file : which could mess things up when there is
// concurrent read and writes happening to the same shuffle group.
- val shuffle3 = shuffleBlockManager.forMapTask(1, 3, 1, new JavaSerializer(testConf),
+ val shuffle3 = shuffleBlockResolver.forMapTask(1, 3, 1, new JavaSerializer(testConf),
new ShuffleWriteMetrics)
for (writer <- shuffle3.writers) {
writer.write("test3", "value")
@@ -98,10 +98,10 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
writer.commitAndClose()
}
// check before we register.
- checkSegments(shuffle2Segment, shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0)))
+ checkSegments(shuffle2Segment, shuffleBlockResolver.getBlockData(ShuffleBlockId(1, 2, 0)))
shuffle3.releaseWriters(success = true)
- checkSegments(shuffle2Segment, shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0)))
- shuffleBlockManager.removeShuffle(1)
+ checkSegments(shuffle2Segment, shuffleBlockResolver.getBlockData(ShuffleBlockId(1, 2, 0)))
+ shuffleBlockResolver.removeShuffle(1)
}
def writeToFile(file: File, numBytes: Int) {
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index 46ca970862..e4faaf8854 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -46,18 +46,18 @@ import org.apache.spark.network.shuffle.protocol.StreamHandle;
public class ExternalShuffleBlockHandler extends RpcHandler {
private final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockHandler.class);
- private final ExternalShuffleBlockManager blockManager;
+ private final ExternalShuffleBlockResolver blockManager;
private final OneForOneStreamManager streamManager;
public ExternalShuffleBlockHandler(TransportConf conf) {
- this(new OneForOneStreamManager(), new ExternalShuffleBlockManager(conf));
+ this(new OneForOneStreamManager(), new ExternalShuffleBlockResolver(conf));
}
/** Enables mocking out the StreamManager and BlockManager. */
@VisibleForTesting
ExternalShuffleBlockHandler(
OneForOneStreamManager streamManager,
- ExternalShuffleBlockManager blockManager) {
+ ExternalShuffleBlockResolver blockManager) {
this.streamManager = streamManager;
this.blockManager = blockManager;
}
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index 93e6fdd716..dd08e24cad 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -44,13 +44,13 @@ import org.apache.spark.network.util.TransportConf;
* Manages converting shuffle BlockIds into physical segments of local files, from a process outside
* of Executors. Each Executor must register its own configuration about where it stores its files
* (local dirs) and how (shuffle manager). The logic for retrieval of individual files is replicated
- * from Spark's FileShuffleBlockManager and IndexShuffleBlockManager.
+ * from Spark's FileShuffleBlockResolver and IndexShuffleBlockResolver.
*
* Executors with shuffle file consolidation are not currently supported, as the index is stored in
- * the Executor's memory, unlike the IndexShuffleBlockManager.
+ * the Executor's memory, unlike the IndexShuffleBlockResolver.
*/
-public class ExternalShuffleBlockManager {
- private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockManager.class);
+public class ExternalShuffleBlockResolver {
+ private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockResolver.class);
// Map containing all registered executors' metadata.
private final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
@@ -60,7 +60,7 @@ public class ExternalShuffleBlockManager {
private final TransportConf conf;
- public ExternalShuffleBlockManager(TransportConf conf) {
+ public ExternalShuffleBlockResolver(TransportConf conf) {
this(conf, Executors.newSingleThreadExecutor(
// Add `spark` prefix because it will run in NM in Yarn mode.
NettyUtils.createThreadFactory("spark-shuffle-directory-cleaner")));
@@ -68,7 +68,7 @@ public class ExternalShuffleBlockManager {
// Allows tests to have more control over when directories are cleaned up.
@VisibleForTesting
- ExternalShuffleBlockManager(TransportConf conf, Executor directoryCleaner) {
+ ExternalShuffleBlockResolver(TransportConf conf, Executor directoryCleaner) {
this.conf = conf;
this.executors = Maps.newConcurrentMap();
this.directoryCleaner = directoryCleaner;
@@ -168,7 +168,7 @@ public class ExternalShuffleBlockManager {
/**
* Hash-based shuffle data is simply stored as one file per block.
- * This logic is from FileShuffleBlockManager.
+ * This logic is from FileShuffleBlockResolver.
*/
// TODO: Support consolidated hash shuffle files
private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor, String blockId) {
@@ -178,7 +178,7 @@ public class ExternalShuffleBlockManager {
/**
* Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapId_0.index" into a data file
- * called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockManager,
+ * called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockResolver,
* and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId.
*/
private ManagedBuffer getSortBasedShuffleBlockData(
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
index 3f9fe1681c..73374cdc77 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
@@ -45,14 +45,14 @@ public class ExternalShuffleBlockHandlerSuite {
TransportClient client = mock(TransportClient.class);
OneForOneStreamManager streamManager;
- ExternalShuffleBlockManager blockManager;
+ ExternalShuffleBlockResolver blockResolver;
RpcHandler handler;
@Before
public void beforeEach() {
streamManager = mock(OneForOneStreamManager.class);
- blockManager = mock(ExternalShuffleBlockManager.class);
- handler = new ExternalShuffleBlockHandler(streamManager, blockManager);
+ blockResolver = mock(ExternalShuffleBlockResolver.class);
+ handler = new ExternalShuffleBlockHandler(streamManager, blockResolver);
}
@Test
@@ -62,7 +62,7 @@ public class ExternalShuffleBlockHandlerSuite {
ExecutorShuffleInfo config = new ExecutorShuffleInfo(new String[] {"/a", "/b"}, 16, "sort");
byte[] registerMessage = new RegisterExecutor("app0", "exec1", config).toByteArray();
handler.receive(client, registerMessage, callback);
- verify(blockManager, times(1)).registerExecutor("app0", "exec1", config);
+ verify(blockResolver, times(1)).registerExecutor("app0", "exec1", config);
verify(callback, times(1)).onSuccess((byte[]) any());
verify(callback, never()).onFailure((Throwable) any());
@@ -75,12 +75,12 @@ public class ExternalShuffleBlockHandlerSuite {
ManagedBuffer block0Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[3]));
ManagedBuffer block1Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
- when(blockManager.getBlockData("app0", "exec1", "b0")).thenReturn(block0Marker);
- when(blockManager.getBlockData("app0", "exec1", "b1")).thenReturn(block1Marker);
+ when(blockResolver.getBlockData("app0", "exec1", "b0")).thenReturn(block0Marker);
+ when(blockResolver.getBlockData("app0", "exec1", "b1")).thenReturn(block1Marker);
byte[] openBlocks = new OpenBlocks("app0", "exec1", new String[] { "b0", "b1" }).toByteArray();
handler.receive(client, openBlocks, callback);
- verify(blockManager, times(1)).getBlockData("app0", "exec1", "b0");
- verify(blockManager, times(1)).getBlockData("app0", "exec1", "b1");
+ verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b0");
+ verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b1");
ArgumentCaptor<byte[]> response = ArgumentCaptor.forClass(byte[].class);
verify(callback, times(1)).onSuccess(response.capture());
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
index dad6428a83..d02f4f0fdb 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
@@ -30,7 +30,7 @@ import org.junit.Test;
import static org.junit.Assert.*;
-public class ExternalShuffleBlockManagerSuite {
+public class ExternalShuffleBlockResolverSuite {
static String sortBlock0 = "Hello!";
static String sortBlock1 = "World!";
@@ -60,29 +60,29 @@ public class ExternalShuffleBlockManagerSuite {
@Test
public void testBadRequests() {
- ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf);
+ ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf);
// Unregistered executor
try {
- manager.getBlockData("app0", "exec1", "shuffle_1_1_0");
+ resolver.getBlockData("app0", "exec1", "shuffle_1_1_0");
fail("Should have failed");
} catch (RuntimeException e) {
assertTrue("Bad error message: " + e, e.getMessage().contains("not registered"));
}
// Invalid shuffle manager
- manager.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar"));
+ resolver.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar"));
try {
- manager.getBlockData("app0", "exec2", "shuffle_1_1_0");
+ resolver.getBlockData("app0", "exec2", "shuffle_1_1_0");
fail("Should have failed");
} catch (UnsupportedOperationException e) {
// pass
}
// Nonexistent shuffle block
- manager.registerExecutor("app0", "exec3",
+ resolver.registerExecutor("app0", "exec3",
dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager"));
try {
- manager.getBlockData("app0", "exec3", "shuffle_1_1_0");
+ resolver.getBlockData("app0", "exec3", "shuffle_1_1_0");
fail("Should have failed");
} catch (Exception e) {
// pass
@@ -91,18 +91,18 @@ public class ExternalShuffleBlockManagerSuite {
@Test
public void testSortShuffleBlocks() throws IOException {
- ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf);
- manager.registerExecutor("app0", "exec0",
+ ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf);
+ resolver.registerExecutor("app0", "exec0",
dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager"));
InputStream block0Stream =
- manager.getBlockData("app0", "exec0", "shuffle_0_0_0").createInputStream();
+ resolver.getBlockData("app0", "exec0", "shuffle_0_0_0").createInputStream();
String block0 = CharStreams.toString(new InputStreamReader(block0Stream));
block0Stream.close();
assertEquals(sortBlock0, block0);
InputStream block1Stream =
- manager.getBlockData("app0", "exec0", "shuffle_0_0_1").createInputStream();
+ resolver.getBlockData("app0", "exec0", "shuffle_0_0_1").createInputStream();
String block1 = CharStreams.toString(new InputStreamReader(block1Stream));
block1Stream.close();
assertEquals(sortBlock1, block1);
@@ -110,18 +110,18 @@ public class ExternalShuffleBlockManagerSuite {
@Test
public void testHashShuffleBlocks() throws IOException {
- ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf);
- manager.registerExecutor("app0", "exec0",
+ ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf);
+ resolver.registerExecutor("app0", "exec0",
dataContext.createExecutorInfo("org.apache.spark.shuffle.hash.HashShuffleManager"));
InputStream block0Stream =
- manager.getBlockData("app0", "exec0", "shuffle_1_0_0").createInputStream();
+ resolver.getBlockData("app0", "exec0", "shuffle_1_0_0").createInputStream();
String block0 = CharStreams.toString(new InputStreamReader(block0Stream));
block0Stream.close();
assertEquals(hashBlock0, block0);
InputStream block1Stream =
- manager.getBlockData("app0", "exec0", "shuffle_1_0_1").createInputStream();
+ resolver.getBlockData("app0", "exec0", "shuffle_1_0_1").createInputStream();
String block1 = CharStreams.toString(new InputStreamReader(block1Stream));
block1Stream.close();
assertEquals(hashBlock1, block1);
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
index 254e3a7a32..d9d9c1bf2f 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
@@ -41,14 +41,15 @@ public class ExternalShuffleCleanupSuite {
public void noCleanupAndCleanup() throws IOException {
TestShuffleDataContext dataContext = createSomeData();
- ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, sameThreadExecutor);
- manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
- manager.applicationRemoved("app", false /* cleanup */);
+ ExternalShuffleBlockResolver resolver =
+ new ExternalShuffleBlockResolver(conf, sameThreadExecutor);
+ resolver.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
+ resolver.applicationRemoved("app", false /* cleanup */);
assertStillThere(dataContext);
- manager.registerExecutor("app", "exec1", dataContext.createExecutorInfo("shuffleMgr"));
- manager.applicationRemoved("app", true /* cleanup */);
+ resolver.registerExecutor("app", "exec1", dataContext.createExecutorInfo("shuffleMgr"));
+ resolver.applicationRemoved("app", true /* cleanup */);
assertCleanedUp(dataContext);
}
@@ -64,7 +65,7 @@ public class ExternalShuffleCleanupSuite {
@Override public void execute(Runnable runnable) { cleanupCalled.set(true); }
};
- ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, noThreadExecutor);
+ ExternalShuffleBlockResolver manager = new ExternalShuffleBlockResolver(conf, noThreadExecutor);
manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
manager.applicationRemoved("app", true);
@@ -81,11 +82,12 @@ public class ExternalShuffleCleanupSuite {
TestShuffleDataContext dataContext0 = createSomeData();
TestShuffleDataContext dataContext1 = createSomeData();
- ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, sameThreadExecutor);
+ ExternalShuffleBlockResolver resolver =
+ new ExternalShuffleBlockResolver(conf, sameThreadExecutor);
- manager.registerExecutor("app", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
- manager.registerExecutor("app", "exec1", dataContext1.createExecutorInfo("shuffleMgr"));
- manager.applicationRemoved("app", true);
+ resolver.registerExecutor("app", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
+ resolver.registerExecutor("app", "exec1", dataContext1.createExecutorInfo("shuffleMgr"));
+ resolver.applicationRemoved("app", true);
assertCleanedUp(dataContext0);
assertCleanedUp(dataContext1);
@@ -96,25 +98,26 @@ public class ExternalShuffleCleanupSuite {
TestShuffleDataContext dataContext0 = createSomeData();
TestShuffleDataContext dataContext1 = createSomeData();
- ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, sameThreadExecutor);
+ ExternalShuffleBlockResolver resolver =
+ new ExternalShuffleBlockResolver(conf, sameThreadExecutor);
- manager.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
- manager.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo("shuffleMgr"));
+ resolver.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
+ resolver.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo("shuffleMgr"));
- manager.applicationRemoved("app-nonexistent", true);
+ resolver.applicationRemoved("app-nonexistent", true);
assertStillThere(dataContext0);
assertStillThere(dataContext1);
- manager.applicationRemoved("app-0", true);
+ resolver.applicationRemoved("app-0", true);
assertCleanedUp(dataContext0);
assertStillThere(dataContext1);
- manager.applicationRemoved("app-1", true);
+ resolver.applicationRemoved("app-1", true);
assertCleanedUp(dataContext0);
assertCleanedUp(dataContext1);
// Make sure it's not an error to cleanup multiple times
- manager.applicationRemoved("app-1", true);
+ resolver.applicationRemoved("app-1", true);
assertCleanedUp(dataContext0);
assertCleanedUp(dataContext1);
}
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
index 76639114df..3fdde054ab 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
@@ -29,7 +29,7 @@ import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
/**
* Manages some sort- and hash-based shuffle data, including the creation
- * and cleanup of directories that can be read by the {@link ExternalShuffleBlockManager}.
+ * and cleanup of directories that can be read by the {@link ExternalShuffleBlockResolver}.
*/
public class TestShuffleDataContext {
public final String[] localDirs;
@@ -61,9 +61,9 @@ public class TestShuffleDataContext {
String blockId = "shuffle_" + shuffleId + "_" + mapId + "_0";
OutputStream dataStream = new FileOutputStream(
- ExternalShuffleBlockManager.getFile(localDirs, subDirsPerLocalDir, blockId + ".data"));
+ ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId + ".data"));
DataOutputStream indexStream = new DataOutputStream(new FileOutputStream(
- ExternalShuffleBlockManager.getFile(localDirs, subDirsPerLocalDir, blockId + ".index")));
+ ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId + ".index")));
long offset = 0;
indexStream.writeLong(offset);
@@ -82,7 +82,7 @@ public class TestShuffleDataContext {
for (int i = 0; i < blocks.length; i ++) {
String blockId = "shuffle_" + shuffleId + "_" + mapId + "_" + i;
Files.write(blocks[i],
- ExternalShuffleBlockManager.getFile(localDirs, subDirsPerLocalDir, blockId));
+ ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId));
}
}