diff options
author | Reynold Xin <rxin@databricks.com> | 2016-04-18 19:30:00 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-04-18 19:30:00 -0700 |
commit | 5e92583d38e11d39deb429a39725443111205a4a (patch) | |
tree | 3ca0408257968f37a7e5e4d0c35ed05f449c145d /core/src/test/scala | |
parent | 4b3d1294aeecc0001a7fa48c92796e6075d34540 (diff) | |
download | spark-5e92583d38e11d39deb429a39725443111205a4a.tar.gz spark-5e92583d38e11d39deb429a39725443111205a4a.tar.bz2 spark-5e92583d38e11d39deb429a39725443111205a4a.zip |
[SPARK-14667] Remove HashShuffleManager
## What changes were proposed in this pull request?
The sort shuffle manager has been the default since Spark 1.2. It is time to remove the old hash shuffle manager.
## How was this patch tested?
Removed some tests related to the old manager.
Author: Reynold Xin <rxin@databricks.com>
Closes #12423 from rxin/SPARK-14667.
Diffstat (limited to 'core/src/test/scala')
5 files changed, 5 insertions, 116 deletions
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala index f98150536d..69ff6c7c28 100644 --- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala @@ -30,7 +30,6 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.internal.Logging import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData} -import org.apache.spark.shuffle.hash.HashShuffleManager import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.storage._ @@ -39,7 +38,7 @@ import org.apache.spark.storage._ * suitable for cleaner tests and provides some utility functions. Subclasses can use different * config options, in particular, a different shuffle manager class */ -abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[HashShuffleManager]) +abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[SortShuffleManager]) extends SparkFunSuite with BeforeAndAfter with LocalSparkContext { implicit val defaultTimeout = timeout(10000 millis) @@ -354,84 +353,6 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase { /** - * A copy of the shuffle tests for sort-based shuffle - */ -class SortShuffleContextCleanerSuite extends ContextCleanerSuiteBase(classOf[SortShuffleManager]) { - test("cleanup shuffle") { - val (rdd, shuffleDeps) = newRDDWithShuffleDependencies() - val collected = rdd.collect().toList - val tester = new CleanerTester(sc, shuffleIds = shuffleDeps.map(_.shuffleId)) - - // Explicit cleanup - shuffleDeps.foreach(s => cleaner.doCleanupShuffle(s.shuffleId, blocking = true)) - tester.assertCleanup() - - // Verify that shuffles can be re-executed after cleaning up - assert(rdd.collect().toList.equals(collected)) - } - - test("automatically cleanup shuffle") { - var rdd = newShuffleRDD() - rdd.count() - - // Test that GC does not cause shuffle cleanup due to a strong reference - val preGCTester = new CleanerTester(sc, shuffleIds = Seq(0)) - runGC() - intercept[Exception] { - preGCTester.assertCleanup()(timeout(1000 millis)) - } - rdd.count() // Defeat early collection by the JVM - - // Test that GC causes shuffle cleanup after dereferencing the RDD - val postGCTester = new CleanerTester(sc, shuffleIds = Seq(0)) - rdd = null // Make RDD out of scope, so that corresponding shuffle goes out of scope - runGC() - postGCTester.assertCleanup() - } - - test("automatically cleanup RDD + shuffle + broadcast in distributed mode") { - sc.stop() - - val conf2 = new SparkConf() - .setMaster("local-cluster[2, 1, 1024]") - .setAppName("ContextCleanerSuite") - .set("spark.cleaner.referenceTracking.blocking", "true") - .set("spark.cleaner.referenceTracking.blocking.shuffle", "true") - .set("spark.shuffle.manager", shuffleManager.getName) - sc = new SparkContext(conf2) - - val numRdds = 10 - val numBroadcasts = 4 // Broadcasts are more costly - val rddBuffer = (1 to numRdds).map(i => randomRdd).toBuffer - val broadcastBuffer = (1 to numBroadcasts).map(i => newBroadcast).toBuffer - val rddIds = sc.persistentRdds.keys.toSeq - val shuffleIds = 0 until sc.newShuffleId() - val broadcastIds = broadcastBuffer.map(_.id) - - val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds) - runGC() - intercept[Exception] { - preGCTester.assertCleanup()(timeout(1000 millis)) - } - - // Test that GC triggers the cleanup of all variables after the dereferencing them - val postGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds) - broadcastBuffer.clear() - rddBuffer.clear() - runGC() - postGCTester.assertCleanup() - - // Make sure the broadcasted task closure no longer exists after GC. - val taskClosureBroadcastId = broadcastIds.max + 1 - assert(sc.env.blockManager.master.getMatchingBlockIds({ - case BroadcastBlockId(`taskClosureBroadcastId`, _) => true - case _ => false - }, askSlaves = true).isEmpty) - } -} - - -/** * Class to test whether RDDs, shuffles, etc. have been successfully cleaned. * The checkpoint here refers only to normal (reliable) checkpoints, not local checkpoints. */ diff --git a/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala b/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala deleted file mode 100644 index 10794235ed..0000000000 --- a/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark - -import org.scalatest.BeforeAndAfterAll - -class HashShuffleSuite extends ShuffleSuite with BeforeAndAfterAll { - - // This test suite should run all tests in ShuffleSuite with hash-based shuffle. - - override def beforeAll() { - super.beforeAll() - conf.set("spark.shuffle.manager", "hash") - } -} diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index d26df7e760..d14728cb50 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.network.netty.NettyBlockTransferService import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.serializer.{KryoSerializer, SerializerManager} -import org.apache.spark.shuffle.hash.HashShuffleManager +import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.storage.StorageLevel._ /** Testsuite that tests block replication in BlockManager */ @@ -44,7 +44,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo private var master: BlockManagerMaster = null private val securityMgr = new SecurityManager(conf) private val mapOutputTracker = new MapOutputTrackerMaster(conf) - private val shuffleManager = new HashShuffleManager(conf) + private val shuffleManager = new SortShuffleManager(conf) // List of block manager created during an unit test, so that all of the them can be stopped // after the unit test. diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index a1c2933584..db1efaf2a2 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -42,7 +42,7 @@ import org.apache.spark.network.shuffle.BlockFetchingListener import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, SerializerManager} -import org.apache.spark.shuffle.hash.HashShuffleManager +import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat import org.apache.spark.util._ import org.apache.spark.util.io.ChunkedByteBuffer @@ -60,7 +60,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE var master: BlockManagerMaster = null val securityMgr = new SecurityManager(new SparkConf(false)) val mapOutputTracker = new MapOutputTrackerMaster(new SparkConf(false)) - val shuffleManager = new HashShuffleManager(new SparkConf(false)) + val shuffleManager = new SortShuffleManager(new SparkConf(false)) // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test val serializer = new KryoSerializer(new SparkConf(false).set("spark.kryoserializer.buffer", "1m")) diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index dc3185a6d5..2410118fb7 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -237,7 +237,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { private def testSimpleSpilling(codec: Option[String] = None): Unit = { val size = 1000 val conf = createSparkConf(loadDefaults = true, codec) // Load defaults for Spark home - conf.set("spark.shuffle.manager", "hash") // avoid using external sorter conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 4).toString) sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) @@ -401,7 +400,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { test("external aggregation updates peak execution memory") { val spillThreshold = 1000 val conf = createSparkConf(loadDefaults = false) - .set("spark.shuffle.manager", "hash") // make sure we're not also using ExternalSorter .set("spark.shuffle.spill.numElementsForceSpillThreshold", spillThreshold.toString) sc = new SparkContext("local", "test", conf) // No spilling |