aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-09-07 20:42:07 -0700
committerReynold Xin <rxin@apache.org>2014-09-07 20:42:07 -0700
commitf25bbbdb3ac5620850c7d09d6a63af888411ecf1 (patch)
treec0af880726ce3e961dd59c8f76e36ee364569460 /core
parent4ba2673569f8c6da7f7348977f52f98f40dfbfec (diff)
downloadspark-f25bbbdb3ac5620850c7d09d6a63af888411ecf1.tar.gz
spark-f25bbbdb3ac5620850c7d09d6a63af888411ecf1.tar.bz2
spark-f25bbbdb3ac5620850c7d09d6a63af888411ecf1.zip
[SPARK-3280] Made sort-based shuffle the default implementation
Sort-based shuffle has lower memory usage and seems to outperform hash-based in almost all of our testing. Author: Reynold Xin <rxin@apache.org> Closes #2178 from rxin/sort-shuffle and squashes the following commits: 713d341 [Reynold Xin] Fixed test failures by setting spark.shuffle.compress to the same value as spark.shuffle.spill.compress. 85165e6 [Reynold Xin] Fixed a comment typo. aa0d372 [Reynold Xin] [SPARK-3280] Made sort-based shuffle the default implementation
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/HashShuffleSuite.scala33
-rw-r--r--core/src/test/scala/org/apache/spark/ShuffleSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/SortShuffleSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala1
5 files changed, 37 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 2973d002cc..20a7444cfc 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -217,7 +217,7 @@ object SparkEnv extends Logging {
val shortShuffleMgrNames = Map(
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
- val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
+ val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
diff --git a/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala b/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala
new file mode 100644
index 0000000000..2acc02a54f
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.scalatest.BeforeAndAfterAll
+
+class HashShuffleSuite extends ShuffleSuite with BeforeAndAfterAll {
+
+ // This test suite should run all tests in ShuffleSuite with hash-based shuffle.
+
+ override def beforeAll() {
+ System.setProperty("spark.shuffle.manager", "hash")
+ }
+
+ override def afterAll() {
+ System.clearProperty("spark.shuffle.manager")
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index b13ddf96bc..15aa4d8380 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.MutablePair
-class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
+abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
val conf = new SparkConf(loadDefaults = false)
diff --git a/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala
index 5c02c00586..639e56c488 100644
--- a/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala
@@ -24,8 +24,7 @@ class SortShuffleSuite extends ShuffleSuite with BeforeAndAfterAll {
// This test suite should run all tests in ShuffleSuite with sort-based shuffle.
override def beforeAll() {
- System.setProperty("spark.shuffle.manager",
- "org.apache.spark.shuffle.sort.SortShuffleManager")
+ System.setProperty("spark.shuffle.manager", "sort")
}
override def afterAll() {
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index ac3931e3d0..511d76c914 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -42,6 +42,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
conf.set("spark.serializer.objectStreamReset", "1")
conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
conf.set("spark.shuffle.spill.compress", codec.isDefined.toString)
+ conf.set("spark.shuffle.compress", codec.isDefined.toString)
codec.foreach { c => conf.set("spark.io.compression.codec", c) }
// Ensure that we actually have multiple batches per spill file
conf.set("spark.shuffle.spill.batchSize", "10")