diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala | 18 |
1 files changed, 15 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala index df98d18fa8..62e0629b34 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala @@ -25,6 +25,9 @@ import org.apache.spark.shuffle._ * mapper (possibly reusing these across waves of tasks). */ private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager { + + private val fileShuffleBlockManager = new FileShuffleBlockManager(conf) + /* Register a shuffle with the manager and obtain a handle for it to pass to tasks. */ override def registerShuffle[K, V, C]( shuffleId: Int, @@ -49,12 +52,21 @@ private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager /** Get a writer for a given partition. Called on executors by map tasks. */ override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext) : ShuffleWriter[K, V] = { - new HashShuffleWriter(handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context) + new HashShuffleWriter( + shuffleBlockManager, handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context) } /** Remove a shuffle's metadata from the ShuffleManager. */ - override def unregisterShuffle(shuffleId: Int): Unit = {} + override def unregisterShuffle(shuffleId: Int): Boolean = { + shuffleBlockManager.removeShuffle(shuffleId) + } + + override def shuffleBlockManager: FileShuffleBlockManager = { + fileShuffleBlockManager + } /** Shut down this ShuffleManager. */ - override def stop(): Unit = {} + override def stop(): Unit = { + shuffleBlockManager.stop() + } } |