aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-18 19:30:00 -0700
committerReynold Xin <rxin@databricks.com>2016-04-18 19:30:00 -0700
commit5e92583d38e11d39deb429a39725443111205a4a (patch)
tree3ca0408257968f37a7e5e4d0c35ed05f449c145d /core/src/test/scala
parent4b3d1294aeecc0001a7fa48c92796e6075d34540 (diff)
downloadspark-5e92583d38e11d39deb429a39725443111205a4a.tar.gz
spark-5e92583d38e11d39deb429a39725443111205a4a.tar.bz2
spark-5e92583d38e11d39deb429a39725443111205a4a.zip
[SPARK-14667] Remove HashShuffleManager
## What changes were proposed in this pull request? The sort shuffle manager has been the default since Spark 1.2. It is time to remove the old hash shuffle manager. ## How was this patch tested? Removed some tests related to the old manager. Author: Reynold Xin <rxin@databricks.com> Closes #12423 from rxin/SPARK-14667.
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala81
-rw-r--r--core/src/test/scala/org/apache/spark/HashShuffleSuite.scala30
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala2
5 files changed, 5 insertions, 116 deletions
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index f98150536d..69ff6c7c28 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -30,7 +30,6 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData}
-import org.apache.spark.shuffle.hash.HashShuffleManager
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.storage._
@@ -39,7 +38,7 @@ import org.apache.spark.storage._
* suitable for cleaner tests and provides some utility functions. Subclasses can use different
* config options, in particular, a different shuffle manager class
*/
-abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[HashShuffleManager])
+abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[SortShuffleManager])
extends SparkFunSuite with BeforeAndAfter with LocalSparkContext
{
implicit val defaultTimeout = timeout(10000 millis)
@@ -354,84 +353,6 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
/**
- * A copy of the shuffle tests for sort-based shuffle
- */
-class SortShuffleContextCleanerSuite extends ContextCleanerSuiteBase(classOf[SortShuffleManager]) {
- test("cleanup shuffle") {
- val (rdd, shuffleDeps) = newRDDWithShuffleDependencies()
- val collected = rdd.collect().toList
- val tester = new CleanerTester(sc, shuffleIds = shuffleDeps.map(_.shuffleId))
-
- // Explicit cleanup
- shuffleDeps.foreach(s => cleaner.doCleanupShuffle(s.shuffleId, blocking = true))
- tester.assertCleanup()
-
- // Verify that shuffles can be re-executed after cleaning up
- assert(rdd.collect().toList.equals(collected))
- }
-
- test("automatically cleanup shuffle") {
- var rdd = newShuffleRDD()
- rdd.count()
-
- // Test that GC does not cause shuffle cleanup due to a strong reference
- val preGCTester = new CleanerTester(sc, shuffleIds = Seq(0))
- runGC()
- intercept[Exception] {
- preGCTester.assertCleanup()(timeout(1000 millis))
- }
- rdd.count() // Defeat early collection by the JVM
-
- // Test that GC causes shuffle cleanup after dereferencing the RDD
- val postGCTester = new CleanerTester(sc, shuffleIds = Seq(0))
- rdd = null // Make RDD out of scope, so that corresponding shuffle goes out of scope
- runGC()
- postGCTester.assertCleanup()
- }
-
- test("automatically cleanup RDD + shuffle + broadcast in distributed mode") {
- sc.stop()
-
- val conf2 = new SparkConf()
- .setMaster("local-cluster[2, 1, 1024]")
- .setAppName("ContextCleanerSuite")
- .set("spark.cleaner.referenceTracking.blocking", "true")
- .set("spark.cleaner.referenceTracking.blocking.shuffle", "true")
- .set("spark.shuffle.manager", shuffleManager.getName)
- sc = new SparkContext(conf2)
-
- val numRdds = 10
- val numBroadcasts = 4 // Broadcasts are more costly
- val rddBuffer = (1 to numRdds).map(i => randomRdd).toBuffer
- val broadcastBuffer = (1 to numBroadcasts).map(i => newBroadcast).toBuffer
- val rddIds = sc.persistentRdds.keys.toSeq
- val shuffleIds = 0 until sc.newShuffleId()
- val broadcastIds = broadcastBuffer.map(_.id)
-
- val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds)
- runGC()
- intercept[Exception] {
- preGCTester.assertCleanup()(timeout(1000 millis))
- }
-
- // Test that GC triggers the cleanup of all variables after the dereferencing them
- val postGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds)
- broadcastBuffer.clear()
- rddBuffer.clear()
- runGC()
- postGCTester.assertCleanup()
-
- // Make sure the broadcasted task closure no longer exists after GC.
- val taskClosureBroadcastId = broadcastIds.max + 1
- assert(sc.env.blockManager.master.getMatchingBlockIds({
- case BroadcastBlockId(`taskClosureBroadcastId`, _) => true
- case _ => false
- }, askSlaves = true).isEmpty)
- }
-}
-
-
-/**
* Class to test whether RDDs, shuffles, etc. have been successfully cleaned.
* The checkpoint here refers only to normal (reliable) checkpoints, not local checkpoints.
*/
diff --git a/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala b/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala
deleted file mode 100644
index 10794235ed..0000000000
--- a/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import org.scalatest.BeforeAndAfterAll
-
-class HashShuffleSuite extends ShuffleSuite with BeforeAndAfterAll {
-
- // This test suite should run all tests in ShuffleSuite with hash-based shuffle.
-
- override def beforeAll() {
- super.beforeAll()
- conf.set("spark.shuffle.manager", "hash")
- }
-}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index d26df7e760..d14728cb50 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -33,7 +33,7 @@ import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
-import org.apache.spark.shuffle.hash.HashShuffleManager
+import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.storage.StorageLevel._
/** Testsuite that tests block replication in BlockManager */
@@ -44,7 +44,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
private var master: BlockManagerMaster = null
private val securityMgr = new SecurityManager(conf)
private val mapOutputTracker = new MapOutputTrackerMaster(conf)
- private val shuffleManager = new HashShuffleManager(conf)
+ private val shuffleManager = new SortShuffleManager(conf)
// List of block manager created during an unit test, so that all of the them can be stopped
// after the unit test.
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index a1c2933584..db1efaf2a2 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -42,7 +42,7 @@ import org.apache.spark.network.shuffle.BlockFetchingListener
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, SerializerManager}
-import org.apache.spark.shuffle.hash.HashShuffleManager
+import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
import org.apache.spark.util._
import org.apache.spark.util.io.ChunkedByteBuffer
@@ -60,7 +60,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
var master: BlockManagerMaster = null
val securityMgr = new SecurityManager(new SparkConf(false))
val mapOutputTracker = new MapOutputTrackerMaster(new SparkConf(false))
- val shuffleManager = new HashShuffleManager(new SparkConf(false))
+ val shuffleManager = new SortShuffleManager(new SparkConf(false))
// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
val serializer = new KryoSerializer(new SparkConf(false).set("spark.kryoserializer.buffer", "1m"))
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index dc3185a6d5..2410118fb7 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -237,7 +237,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
private def testSimpleSpilling(codec: Option[String] = None): Unit = {
val size = 1000
val conf = createSparkConf(loadDefaults = true, codec) // Load defaults for Spark home
- conf.set("spark.shuffle.manager", "hash") // avoid using external sorter
conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 4).toString)
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
@@ -401,7 +400,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
test("external aggregation updates peak execution memory") {
val spillThreshold = 1000
val conf = createSparkConf(loadDefaults = false)
- .set("spark.shuffle.manager", "hash") // make sure we're not also using ExternalSorter
.set("spark.shuffle.spill.numElementsForceSpillThreshold", spillThreshold.toString)
sc = new SparkContext("local", "test", conf)
// No spilling