aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala18
1 files changed, 15 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
index df98d18fa8..62e0629b34 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
@@ -25,6 +25,9 @@ import org.apache.spark.shuffle._
* mapper (possibly reusing these across waves of tasks).
*/
private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager {
+
+ private val fileShuffleBlockManager = new FileShuffleBlockManager(conf)
+
/* Register a shuffle with the manager and obtain a handle for it to pass to tasks. */
override def registerShuffle[K, V, C](
shuffleId: Int,
@@ -49,12 +52,21 @@ private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager
/** Get a writer for a given partition. Called on executors by map tasks. */
override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext)
: ShuffleWriter[K, V] = {
- new HashShuffleWriter(handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context)
+ new HashShuffleWriter(
+ shuffleBlockManager, handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context)
}
/** Remove a shuffle's metadata from the ShuffleManager. */
- override def unregisterShuffle(shuffleId: Int): Unit = {}
+ override def unregisterShuffle(shuffleId: Int): Boolean = {
+ shuffleBlockManager.removeShuffle(shuffleId)
+ }
+
+ override def shuffleBlockManager: FileShuffleBlockManager = {
+ fileShuffleBlockManager
+ }
/** Shut down this ShuffleManager. */
- override def stop(): Unit = {}
+ override def stop(): Unit = {
+ shuffleBlockManager.stop()
+ }
}