aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2015-05-08 12:24:06 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-05-08 12:24:06 -0700
commit4b3bb0e43ca7e1a27308516608419487b6a844e6 (patch)
tree07327d1d8cf2c8a8bc34217c706d3d761e61c31e /core
parent2d05f325dc3c70349bd17ed399897f22d967c687 (diff)
downloadspark-4b3bb0e43ca7e1a27308516608419487b6a844e6.tar.gz
spark-4b3bb0e43ca7e1a27308516608419487b6a844e6.tar.bz2
spark-4b3bb0e43ca7e1a27308516608419487b6a844e6.zip
[SPARK-6627] Finished rename to ShuffleBlockResolver
The previous cleanup-commit for SPARK-6627 renamed ShuffleBlockManager to ShuffleBlockResolver, but didn't rename the associated subclasses and variables; this commit does that. I'm unsure whether it's ok to rename ExternalShuffleBlockManager, since that's technically a public class? cc pwendell Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #5764 from kayousterhout/SPARK-6627 and squashes the following commits: 43add1e [Kay Ousterhout] Spacing fix 96080bf [Kay Ousterhout] Test fixes d8a5d36 [Kay Ousterhout] [SPARK-6627] Finished rename to ShuffleBlockResolver
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala (renamed from core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala)11
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala (renamed from core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala)10
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockId.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala18
10 files changed, 36 insertions, 40 deletions
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
index e9b4e2b955..6ad427bcac 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
@@ -18,7 +18,6 @@
package org.apache.spark.shuffle
import java.io.File
-import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger
@@ -29,7 +28,7 @@ import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.serializer.Serializer
-import org.apache.spark.shuffle.FileShuffleBlockManager.ShuffleFileGroup
+import org.apache.spark.shuffle.FileShuffleBlockResolver.ShuffleFileGroup
import org.apache.spark.storage._
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
@@ -64,9 +63,8 @@ private[spark] trait ShuffleWriterGroup {
* files within a ShuffleFileGroups associated with the block's reducer.
*/
// Note: Changes to the format in this file should be kept in sync with
-// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getHashBasedShuffleBlockData().
-private[spark]
-class FileShuffleBlockManager(conf: SparkConf)
+// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getHashBasedShuffleBlockData().
+private[spark] class FileShuffleBlockResolver(conf: SparkConf)
extends ShuffleBlockResolver with Logging {
private val transportConf = SparkTransportConf.fromSparkConf(conf)
@@ -242,8 +240,7 @@ class FileShuffleBlockManager(conf: SparkConf)
}
}
-private[spark]
-object FileShuffleBlockManager {
+private[spark] object FileShuffleBlockResolver {
/**
* A group of shuffle files, one per reducer.
* A particular mapper will be assigned a single ShuffleFileGroup to write its output to.
diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
index a1741e2875..d9c63b6e7b 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -18,7 +18,6 @@
package org.apache.spark.shuffle
import java.io._
-import java.nio.ByteBuffer
import com.google.common.io.ByteStreams
@@ -28,7 +27,7 @@ import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.storage._
import org.apache.spark.util.Utils
-import IndexShuffleBlockManager.NOOP_REDUCE_ID
+import IndexShuffleBlockResolver.NOOP_REDUCE_ID
/**
* Create and maintain the shuffle blocks' mapping between logic block and physical file location.
@@ -40,9 +39,8 @@ import IndexShuffleBlockManager.NOOP_REDUCE_ID
*
*/
// Note: Changes to the format in this file should be kept in sync with
-// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getSortBasedShuffleBlockData().
-private[spark]
-class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockResolver {
+// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getSortBasedShuffleBlockData().
+private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleBlockResolver {
private lazy val blockManager = SparkEnv.get.blockManager
@@ -115,7 +113,7 @@ class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockResolver {
override def stop(): Unit = {}
}
-private[spark] object IndexShuffleBlockManager {
+private[spark] object IndexShuffleBlockResolver {
// No-op reduce ID used in interactions with disk store and BlockObjectWriter.
// The disk store currently expects puts to relate to a (map, reduce) pair, but in the sort
// shuffle outputs for several reduces are glommed into a single file.
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
index 2a7df8dd5b..c089088f40 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
@@ -26,7 +26,7 @@ import org.apache.spark.shuffle._
*/
private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager {
- private val fileShuffleBlockManager = new FileShuffleBlockManager(conf)
+ private val fileShuffleBlockResolver = new FileShuffleBlockResolver(conf)
/* Register a shuffle with the manager and obtain a handle for it to pass to tasks. */
override def registerShuffle[K, V, C](
@@ -61,8 +61,8 @@ private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager
shuffleBlockResolver.removeShuffle(shuffleId)
}
- override def shuffleBlockResolver: FileShuffleBlockManager = {
- fileShuffleBlockManager
+ override def shuffleBlockResolver: FileShuffleBlockResolver = {
+ fileShuffleBlockResolver
}
/** Shut down this ShuffleManager. */
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
index cd27c9e07a..897f0a5dc5 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
@@ -25,7 +25,7 @@ import org.apache.spark.shuffle._
import org.apache.spark.storage.BlockObjectWriter
private[spark] class HashShuffleWriter[K, V](
- shuffleBlockManager: FileShuffleBlockManager,
+ shuffleBlockResolver: FileShuffleBlockResolver,
handle: BaseShuffleHandle[K, V, _],
mapId: Int,
context: TaskContext)
@@ -45,7 +45,7 @@ private[spark] class HashShuffleWriter[K, V](
private val blockManager = SparkEnv.get.blockManager
private val ser = Serializer.getSerializer(dep.serializer.getOrElse(null))
- private val shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser,
+ private val shuffle = shuffleBlockResolver.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser,
writeMetrics)
/** Write a bunch of records to this task's output */
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
index 0497036192..15842941da 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
@@ -25,7 +25,7 @@ import org.apache.spark.shuffle.hash.HashShuffleReader
private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager {
- private val indexShuffleBlockManager = new IndexShuffleBlockManager(conf)
+ private val indexShuffleBlockResolver = new IndexShuffleBlockResolver(conf)
private val shuffleMapNumber = new ConcurrentHashMap[Int, Int]()
/**
@@ -72,8 +72,8 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
true
}
- override def shuffleBlockResolver: IndexShuffleBlockManager = {
- indexShuffleBlockManager
+ override def shuffleBlockResolver: IndexShuffleBlockResolver = {
+ indexShuffleBlockResolver
}
/** Shut down this ShuffleManager. */
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index a066435df6..add2656294 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -20,12 +20,12 @@ package org.apache.spark.shuffle.sort
import org.apache.spark.{MapOutputTracker, SparkEnv, Logging, TaskContext}
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.scheduler.MapStatus
-import org.apache.spark.shuffle.{IndexShuffleBlockManager, ShuffleWriter, BaseShuffleHandle}
+import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleWriter, BaseShuffleHandle}
import org.apache.spark.storage.ShuffleBlockId
import org.apache.spark.util.collection.ExternalSorter
private[spark] class SortShuffleWriter[K, V, C](
- shuffleBlockManager: IndexShuffleBlockManager,
+ shuffleBlockResolver: IndexShuffleBlockResolver,
handle: BaseShuffleHandle[K, V, C],
mapId: Int,
context: TaskContext)
@@ -65,10 +65,10 @@ private[spark] class SortShuffleWriter[K, V, C](
// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
- val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId)
- val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockManager.NOOP_REDUCE_ID)
+ val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
+ val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
- shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths)
+ shuffleBlockResolver.writeIndexFile(dep.shuffleId, mapId, partitionLengths)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
}
@@ -84,7 +84,7 @@ private[spark] class SortShuffleWriter[K, V, C](
return Option(mapStatus)
} else {
// The map task failed, so delete our output data.
- shuffleBlockManager.removeDataByMap(dep.shuffleId, mapId)
+ shuffleBlockResolver.removeDataByMap(dep.shuffleId, mapId)
return None
}
} finally {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
index c186fd360f..524f697099 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
@@ -54,7 +54,7 @@ case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
}
// Format of the shuffle block ids (including data and index) should be kept in sync with
-// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getBlockData().
+// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getBlockData().
@DeveloperApi
case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index a46fecd227..cc794e5c90 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -431,10 +431,11 @@ private[spark] class BlockManager(
// As an optimization for map output fetches, if the block is for a shuffle, return it
// without acquiring a lock; the disk store never deletes (recent) items so this should work
if (blockId.isShuffle) {
- val shuffleBlockManager = shuffleManager.shuffleBlockResolver
+ val shuffleBlockResolver = shuffleManager.shuffleBlockResolver
// TODO: This should gracefully handle case where local block is not available. Currently
// downstream code will throw an exception.
- Option(shuffleBlockManager.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
+ Option(
+ shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
} else {
doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
}
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 5764c16902..2a4447705f 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -55,7 +55,7 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
/** Looks up a file by hashing it into one of our local subdirectories. */
// This method should be kept in sync with
- // org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getFile().
+ // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
def getFile(filename: String): File = {
// Figure out which local directory it hashes to, and which subdirectory in that
val hash = Utils.nonNegativeHash(filename)
diff --git a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
index 84384bb489..0537bf66ad 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.{SparkEnv, SparkContext, LocalSparkContext, SparkConf}
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.serializer.JavaSerializer
-import org.apache.spark.shuffle.FileShuffleBlockManager
+import org.apache.spark.shuffle.FileShuffleBlockResolver
import org.apache.spark.storage.{ShuffleBlockId, FileSegment}
class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
@@ -53,10 +53,10 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
sc = new SparkContext("local", "test", conf)
- val shuffleBlockManager =
- SparkEnv.get.shuffleManager.shuffleBlockResolver.asInstanceOf[FileShuffleBlockManager]
+ val shuffleBlockResolver =
+ SparkEnv.get.shuffleManager.shuffleBlockResolver.asInstanceOf[FileShuffleBlockResolver]
- val shuffle1 = shuffleBlockManager.forMapTask(1, 1, 1, new JavaSerializer(conf),
+ val shuffle1 = shuffleBlockResolver.forMapTask(1, 1, 1, new JavaSerializer(conf),
new ShuffleWriteMetrics)
for (writer <- shuffle1.writers) {
writer.write("test1", "value")
@@ -69,7 +69,7 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
val shuffle1Segment = shuffle1.writers(0).fileSegment()
shuffle1.releaseWriters(success = true)
- val shuffle2 = shuffleBlockManager.forMapTask(1, 2, 1, new JavaSerializer(conf),
+ val shuffle2 = shuffleBlockResolver.forMapTask(1, 2, 1, new JavaSerializer(conf),
new ShuffleWriteMetrics)
for (writer <- shuffle2.writers) {
@@ -88,7 +88,7 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
// of block based on remaining data in file : which could mess things up when there is
// concurrent read and writes happening to the same shuffle group.
- val shuffle3 = shuffleBlockManager.forMapTask(1, 3, 1, new JavaSerializer(testConf),
+ val shuffle3 = shuffleBlockResolver.forMapTask(1, 3, 1, new JavaSerializer(testConf),
new ShuffleWriteMetrics)
for (writer <- shuffle3.writers) {
writer.write("test3", "value")
@@ -98,10 +98,10 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
writer.commitAndClose()
}
// check before we register.
- checkSegments(shuffle2Segment, shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0)))
+ checkSegments(shuffle2Segment, shuffleBlockResolver.getBlockData(ShuffleBlockId(1, 2, 0)))
shuffle3.releaseWriters(success = true)
- checkSegments(shuffle2Segment, shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0)))
- shuffleBlockManager.removeShuffle(1)
+ checkSegments(shuffle2Segment, shuffleBlockResolver.getBlockData(ShuffleBlockId(1, 2, 0)))
+ shuffleBlockResolver.removeShuffle(1)
}
def writeToFile(file: File, numBytes: Int) {