aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-18 19:30:00 -0700
committerReynold Xin <rxin@databricks.com>2016-04-18 19:30:00 -0700
commit5e92583d38e11d39deb429a39725443111205a4a (patch)
tree3ca0408257968f37a7e5e4d0c35ed05f449c145d /core
parent4b3d1294aeecc0001a7fa48c92796e6075d34540 (diff)
downloadspark-5e92583d38e11d39deb429a39725443111205a4a.tar.gz
spark-5e92583d38e11d39deb429a39725443111205a4a.tar.bz2
spark-5e92583d38e11d39deb429a39725443111205a4a.zip
[SPARK-14667] Remove HashShuffleManager
## What changes were proposed in this pull request? The sort shuffle manager has been the default since Spark 1.2. It is time to remove the old hash shuffle manager. ## How was this patch tested? Removed some tests related to the old manager. Author: Reynold Xin <rxin@databricks.com> Closes #12423 from rxin/SPARK-14667.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala136
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala81
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala141
-rw-r--r--core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala81
-rw-r--r--core/src/test/scala/org/apache/spark/HashShuffleSuite.scala30
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala2
9 files changed, 7 insertions, 477 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 3d11db7461..27497e21b8 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -298,9 +298,8 @@ object SparkEnv extends Logging {
// Let the user specify short names for shuffle managers
val shortShuffleMgrNames = Map(
- "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
- "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
- "tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
+ "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
+ "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
deleted file mode 100644
index be1e84a2ba..0000000000
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.shuffle
-
-import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue}
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.{SparkConf, SparkEnv}
-import org.apache.spark.executor.ShuffleWriteMetrics
-import org.apache.spark.internal.Logging
-import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
-import org.apache.spark.network.netty.SparkTransportConf
-import org.apache.spark.serializer.Serializer
-import org.apache.spark.storage._
-import org.apache.spark.util.Utils
-
-/** A group of writers for a ShuffleMapTask, one writer per reducer. */
-private[spark] trait ShuffleWriterGroup {
- val writers: Array[DiskBlockObjectWriter]
-
- /** @param success Indicates all writes were successful. If false, no blocks will be recorded. */
- def releaseWriters(success: Boolean): Unit
-}
-
-/**
- * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one file
- * per reducer.
- */
-// Note: Changes to the format in this file should be kept in sync with
-// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getHashBasedShuffleBlockData().
-private[spark] class FileShuffleBlockResolver(conf: SparkConf)
- extends ShuffleBlockResolver with Logging {
-
- private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle")
-
- private lazy val blockManager = SparkEnv.get.blockManager
-
- // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
- private val bufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024
-
- /**
- * Contains all the state related to a particular shuffle.
- */
- private class ShuffleState(val numReducers: Int) {
- /**
- * The mapIds of all map tasks completed on this Executor for this shuffle.
- */
- val completedMapTasks = new ConcurrentLinkedQueue[Int]()
- }
-
- private val shuffleStates = new ConcurrentHashMap[ShuffleId, ShuffleState]
-
- /**
- * Get a ShuffleWriterGroup for the given map task, which will register it as complete
- * when the writers are closed successfully
- */
- def forMapTask(shuffleId: Int, mapId: Int, numReducers: Int, serializer: Serializer,
- writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = {
- new ShuffleWriterGroup {
- private val shuffleState: ShuffleState = {
- // Note: we do _not_ want to just wrap this java ConcurrentHashMap into a Scala map and use
- // .getOrElseUpdate() because that's actually NOT atomic.
- shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numReducers))
- shuffleStates.get(shuffleId)
- }
- val openStartTime = System.nanoTime
- val serializerInstance = serializer.newInstance()
- val writers: Array[DiskBlockObjectWriter] = {
- Array.tabulate[DiskBlockObjectWriter](numReducers) { bucketId =>
- val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
- val blockFile = blockManager.diskBlockManager.getFile(blockId)
- val tmp = Utils.tempFileWith(blockFile)
- blockManager.getDiskWriter(blockId, tmp, serializerInstance, bufferSize, writeMetrics)
- }
- }
- // Creating the file to write to and creating a disk writer both involve interacting with
- // the disk, so should be included in the shuffle write time.
- writeMetrics.incWriteTime(System.nanoTime - openStartTime)
-
- override def releaseWriters(success: Boolean) {
- shuffleState.completedMapTasks.add(mapId)
- }
- }
- }
-
- override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
- val file = blockManager.diskBlockManager.getFile(blockId)
- new FileSegmentManagedBuffer(transportConf, file, 0, file.length)
- }
-
- /** Remove all the blocks / files and metadata related to a particular shuffle. */
- def removeShuffle(shuffleId: ShuffleId): Boolean = {
- // Do not change the ordering of this, if shuffleStates should be removed only
- // after the corresponding shuffle blocks have been removed
- val cleaned = removeShuffleBlocks(shuffleId)
- shuffleStates.remove(shuffleId)
- cleaned
- }
-
- /** Remove all the blocks / files related to a particular shuffle. */
- private def removeShuffleBlocks(shuffleId: ShuffleId): Boolean = {
- Option(shuffleStates.get(shuffleId)) match {
- case Some(state) =>
- for (mapId <- state.completedMapTasks.asScala; reduceId <- 0 until state.numReducers) {
- val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId)
- val file = blockManager.diskBlockManager.getFile(blockId)
- if (!file.delete()) {
- logWarning(s"Error deleting ${file.getPath()}")
- }
- }
- logInfo("Deleted all files for shuffle " + shuffleId)
- true
- case None =>
- logInfo("Could not find files for shuffle " + shuffleId + " for deleting")
- false
- }
- }
-
- override def stop(): Unit = {}
-}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
deleted file mode 100644
index 6bb4ff94b5..0000000000
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.shuffle.hash
-
-import org.apache.spark._
-import org.apache.spark.internal.Logging
-import org.apache.spark.shuffle._
-
-/**
- * A ShuffleManager using hashing, that creates one output file per reduce partition on each
- * mapper (possibly reusing these across waves of tasks).
- */
-private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {
-
- if (!conf.getBoolean("spark.shuffle.spill", true)) {
- logWarning(
- "spark.shuffle.spill was set to false, but this configuration is ignored as of Spark 1.6+." +
- " Shuffle will continue to spill to disk when necessary.")
- }
-
- private val fileShuffleBlockResolver = new FileShuffleBlockResolver(conf)
-
- override val shortName: String = "hash"
-
- /* Register a shuffle with the manager and obtain a handle for it to pass to tasks. */
- override def registerShuffle[K, V, C](
- shuffleId: Int,
- numMaps: Int,
- dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
- new BaseShuffleHandle(shuffleId, numMaps, dependency)
- }
-
- /**
- * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
- * Called on executors by reduce tasks.
- */
- override def getReader[K, C](
- handle: ShuffleHandle,
- startPartition: Int,
- endPartition: Int,
- context: TaskContext): ShuffleReader[K, C] = {
- new BlockStoreShuffleReader(
- handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)
- }
-
- /** Get a writer for a given partition. Called on executors by map tasks. */
- override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext)
- : ShuffleWriter[K, V] = {
- new HashShuffleWriter(
- shuffleBlockResolver, handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context)
- }
-
- /** Remove a shuffle's metadata from the ShuffleManager. */
- override def unregisterShuffle(shuffleId: Int): Boolean = {
- shuffleBlockResolver.removeShuffle(shuffleId)
- }
-
- override def shuffleBlockResolver: FileShuffleBlockResolver = {
- fileShuffleBlockResolver
- }
-
- /** Shut down this ShuffleManager. */
- override def stop(): Unit = {
- shuffleBlockResolver.stop()
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
deleted file mode 100644
index 6c4444ffb4..0000000000
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.shuffle.hash
-
-import java.io.IOException
-
-import org.apache.spark._
-import org.apache.spark.internal.Logging
-import org.apache.spark.scheduler.MapStatus
-import org.apache.spark.shuffle._
-import org.apache.spark.storage.DiskBlockObjectWriter
-
-private[spark] class HashShuffleWriter[K, V](
- shuffleBlockResolver: FileShuffleBlockResolver,
- handle: BaseShuffleHandle[K, V, _],
- mapId: Int,
- context: TaskContext)
- extends ShuffleWriter[K, V] with Logging {
-
- private val dep = handle.dependency
- private val numOutputSplits = dep.partitioner.numPartitions
- private val metrics = context.taskMetrics
-
- // Are we in the process of stopping? Because map tasks can call stop() with success = true
- // and then call stop() with success = false if they get an exception, we want to make sure
- // we don't try deleting files, etc twice.
- private var stopping = false
-
- private val writeMetrics = metrics.shuffleWriteMetrics
-
- private val blockManager = SparkEnv.get.blockManager
- private val shuffle = shuffleBlockResolver.forMapTask(dep.shuffleId, mapId, numOutputSplits,
- dep.serializer, writeMetrics)
-
- /** Write a bunch of records to this task's output */
- override def write(records: Iterator[Product2[K, V]]): Unit = {
- val iter = if (dep.aggregator.isDefined) {
- if (dep.mapSideCombine) {
- dep.aggregator.get.combineValuesByKey(records, context)
- } else {
- records
- }
- } else {
- require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
- records
- }
-
- for (elem <- iter) {
- val bucketId = dep.partitioner.getPartition(elem._1)
- shuffle.writers(bucketId).write(elem._1, elem._2)
- }
- }
-
- /** Close this writer, passing along whether the map completed */
- override def stop(initiallySuccess: Boolean): Option[MapStatus] = {
- var success = initiallySuccess
- try {
- if (stopping) {
- return None
- }
- stopping = true
- if (success) {
- try {
- Some(commitWritesAndBuildStatus())
- } catch {
- case e: Exception =>
- success = false
- revertWrites()
- throw e
- }
- } else {
- revertWrites()
- None
- }
- } finally {
- // Release the writers back to the shuffle block manager.
- if (shuffle != null && shuffle.writers != null) {
- try {
- shuffle.releaseWriters(success)
- } catch {
- case e: Exception => logError("Failed to release shuffle writers", e)
- }
- }
- }
- }
-
- private def commitWritesAndBuildStatus(): MapStatus = {
- // Commit the writes. Get the size of each bucket block (total block size).
- val sizes: Array[Long] = shuffle.writers.map { writer: DiskBlockObjectWriter =>
- writer.commitAndClose()
- writer.fileSegment().length
- }
- // rename all shuffle files to final paths
- // Note: there is only one ShuffleBlockResolver in executor
- shuffleBlockResolver.synchronized {
- shuffle.writers.zipWithIndex.foreach { case (writer, i) =>
- val output = blockManager.diskBlockManager.getFile(writer.blockId)
- if (sizes(i) > 0) {
- if (output.exists()) {
- // Use length of existing file and delete our own temporary one
- sizes(i) = output.length()
- writer.file.delete()
- } else {
- // Commit by renaming our temporary file to something the fetcher expects
- if (!writer.file.renameTo(output)) {
- throw new IOException(s"fail to rename ${writer.file} to $output")
- }
- }
- } else {
- if (output.exists()) {
- output.delete()
- }
- }
- }
- }
- MapStatus(blockManager.shuffleServerId, sizes)
- }
-
- private def revertWrites(): Unit = {
- if (shuffle != null && shuffle.writers != null) {
- for (writer <- shuffle.writers) {
- writer.revertPartialWritesAndClose()
- }
- }
- }
-}
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index f98150536d..69ff6c7c28 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -30,7 +30,6 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData}
-import org.apache.spark.shuffle.hash.HashShuffleManager
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.storage._
@@ -39,7 +38,7 @@ import org.apache.spark.storage._
* suitable for cleaner tests and provides some utility functions. Subclasses can use different
* config options, in particular, a different shuffle manager class
*/
-abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[HashShuffleManager])
+abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[SortShuffleManager])
extends SparkFunSuite with BeforeAndAfter with LocalSparkContext
{
implicit val defaultTimeout = timeout(10000 millis)
@@ -354,84 +353,6 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
/**
- * A copy of the shuffle tests for sort-based shuffle
- */
-class SortShuffleContextCleanerSuite extends ContextCleanerSuiteBase(classOf[SortShuffleManager]) {
- test("cleanup shuffle") {
- val (rdd, shuffleDeps) = newRDDWithShuffleDependencies()
- val collected = rdd.collect().toList
- val tester = new CleanerTester(sc, shuffleIds = shuffleDeps.map(_.shuffleId))
-
- // Explicit cleanup
- shuffleDeps.foreach(s => cleaner.doCleanupShuffle(s.shuffleId, blocking = true))
- tester.assertCleanup()
-
- // Verify that shuffles can be re-executed after cleaning up
- assert(rdd.collect().toList.equals(collected))
- }
-
- test("automatically cleanup shuffle") {
- var rdd = newShuffleRDD()
- rdd.count()
-
- // Test that GC does not cause shuffle cleanup due to a strong reference
- val preGCTester = new CleanerTester(sc, shuffleIds = Seq(0))
- runGC()
- intercept[Exception] {
- preGCTester.assertCleanup()(timeout(1000 millis))
- }
- rdd.count() // Defeat early collection by the JVM
-
- // Test that GC causes shuffle cleanup after dereferencing the RDD
- val postGCTester = new CleanerTester(sc, shuffleIds = Seq(0))
- rdd = null // Make RDD out of scope, so that corresponding shuffle goes out of scope
- runGC()
- postGCTester.assertCleanup()
- }
-
- test("automatically cleanup RDD + shuffle + broadcast in distributed mode") {
- sc.stop()
-
- val conf2 = new SparkConf()
- .setMaster("local-cluster[2, 1, 1024]")
- .setAppName("ContextCleanerSuite")
- .set("spark.cleaner.referenceTracking.blocking", "true")
- .set("spark.cleaner.referenceTracking.blocking.shuffle", "true")
- .set("spark.shuffle.manager", shuffleManager.getName)
- sc = new SparkContext(conf2)
-
- val numRdds = 10
- val numBroadcasts = 4 // Broadcasts are more costly
- val rddBuffer = (1 to numRdds).map(i => randomRdd).toBuffer
- val broadcastBuffer = (1 to numBroadcasts).map(i => newBroadcast).toBuffer
- val rddIds = sc.persistentRdds.keys.toSeq
- val shuffleIds = 0 until sc.newShuffleId()
- val broadcastIds = broadcastBuffer.map(_.id)
-
- val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds)
- runGC()
- intercept[Exception] {
- preGCTester.assertCleanup()(timeout(1000 millis))
- }
-
- // Test that GC triggers the cleanup of all variables after the dereferencing them
- val postGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds)
- broadcastBuffer.clear()
- rddBuffer.clear()
- runGC()
- postGCTester.assertCleanup()
-
- // Make sure the broadcasted task closure no longer exists after GC.
- val taskClosureBroadcastId = broadcastIds.max + 1
- assert(sc.env.blockManager.master.getMatchingBlockIds({
- case BroadcastBlockId(`taskClosureBroadcastId`, _) => true
- case _ => false
- }, askSlaves = true).isEmpty)
- }
-}
-
-
-/**
* Class to test whether RDDs, shuffles, etc. have been successfully cleaned.
* The checkpoint here refers only to normal (reliable) checkpoints, not local checkpoints.
*/
diff --git a/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala b/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala
deleted file mode 100644
index 10794235ed..0000000000
--- a/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import org.scalatest.BeforeAndAfterAll
-
-class HashShuffleSuite extends ShuffleSuite with BeforeAndAfterAll {
-
- // This test suite should run all tests in ShuffleSuite with hash-based shuffle.
-
- override def beforeAll() {
- super.beforeAll()
- conf.set("spark.shuffle.manager", "hash")
- }
-}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index d26df7e760..d14728cb50 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -33,7 +33,7 @@ import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
-import org.apache.spark.shuffle.hash.HashShuffleManager
+import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.storage.StorageLevel._
/** Testsuite that tests block replication in BlockManager */
@@ -44,7 +44,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
private var master: BlockManagerMaster = null
private val securityMgr = new SecurityManager(conf)
private val mapOutputTracker = new MapOutputTrackerMaster(conf)
- private val shuffleManager = new HashShuffleManager(conf)
+ private val shuffleManager = new SortShuffleManager(conf)
// List of block manager created during an unit test, so that all of the them can be stopped
// after the unit test.
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index a1c2933584..db1efaf2a2 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -42,7 +42,7 @@ import org.apache.spark.network.shuffle.BlockFetchingListener
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, SerializerManager}
-import org.apache.spark.shuffle.hash.HashShuffleManager
+import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
import org.apache.spark.util._
import org.apache.spark.util.io.ChunkedByteBuffer
@@ -60,7 +60,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
var master: BlockManagerMaster = null
val securityMgr = new SecurityManager(new SparkConf(false))
val mapOutputTracker = new MapOutputTrackerMaster(new SparkConf(false))
- val shuffleManager = new HashShuffleManager(new SparkConf(false))
+ val shuffleManager = new SortShuffleManager(new SparkConf(false))
// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
val serializer = new KryoSerializer(new SparkConf(false).set("spark.kryoserializer.buffer", "1m"))
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index dc3185a6d5..2410118fb7 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -237,7 +237,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
private def testSimpleSpilling(codec: Option[String] = None): Unit = {
val size = 1000
val conf = createSparkConf(loadDefaults = true, codec) // Load defaults for Spark home
- conf.set("spark.shuffle.manager", "hash") // avoid using external sorter
conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 4).toString)
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
@@ -401,7 +400,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
test("external aggregation updates peak execution memory") {
val spillThreshold = 1000
val conf = createSparkConf(loadDefaults = false)
- .set("spark.shuffle.manager", "hash") // make sure we're not also using ExternalSorter
.set("spark.shuffle.spill.numElementsForceSpillThreshold", spillThreshold.toString)
sc = new SparkContext("local", "test", conf)
// No spilling