aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-10-22 09:46:30 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-10-22 09:46:30 -0700
commitf6d06adf05afa9c5386dc2396c94e7a98730289f (patch)
tree9e3d8e4350e0a465124840eea91f6aa39c00b156 /sql/core
parent94e2064fa1b04c05c805d9175c7c78bf583db5c6 (diff)
downloadspark-f6d06adf05afa9c5386dc2396c94e7a98730289f.tar.gz
spark-f6d06adf05afa9c5386dc2396c94e7a98730289f.tar.bz2
spark-f6d06adf05afa9c5386dc2396c94e7a98730289f.zip
[SPARK-10708] Consolidate sort shuffle implementations
There's a lot of duplication between SortShuffleManager and UnsafeShuffleManager. Given that these now provide the same set of functionality, now that UnsafeShuffleManager supports large records, I think that we should replace SortShuffleManager's serialized shuffle implementation with UnsafeShuffleManager's and should merge the two managers together. Author: Josh Rosen <joshrosen@databricks.com> Closes #8829 from JoshRosen/consolidate-sort-shuffle-implementations.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala23
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala9
2 files changed, 12 insertions, 20 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 1d3379a5e2..7f60c8f5ea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -24,7 +24,6 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.hash.HashShuffleManager
import org.apache.spark.shuffle.sort.SortShuffleManager
-import org.apache.spark.shuffle.unsafe.UnsafeShuffleManager
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors.attachTree
@@ -87,10 +86,8 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
// fewer partitions (like RangePartitioner, for example).
val conf = child.sqlContext.sparkContext.conf
val shuffleManager = SparkEnv.get.shuffleManager
- val sortBasedShuffleOn = shuffleManager.isInstanceOf[SortShuffleManager] ||
- shuffleManager.isInstanceOf[UnsafeShuffleManager]
+ val sortBasedShuffleOn = shuffleManager.isInstanceOf[SortShuffleManager]
val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
- val serializeMapOutputs = conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true)
if (sortBasedShuffleOn) {
val bypassIsSupported = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
if (bypassIsSupported && partitioner.numPartitions <= bypassMergeThreshold) {
@@ -99,22 +96,18 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
// doesn't buffer deserialized records.
// Note that we'll have to remove this case if we fix SPARK-6026 and remove this bypass.
false
- } else if (serializeMapOutputs && serializer.supportsRelocationOfSerializedObjects) {
- // SPARK-4550 extended sort-based shuffle to serialize individual records prior to sorting
- // them. This optimization is guarded by a feature-flag and is only applied in cases where
- // shuffle dependency does not specify an aggregator or ordering and the record serializer
- // has certain properties. If this optimization is enabled, we can safely avoid the copy.
+ } else if (serializer.supportsRelocationOfSerializedObjects) {
+ // SPARK-4550 and SPARK-7081 extended sort-based shuffle to serialize individual records
+ // prior to sorting them. This optimization is only applied in cases where shuffle
+ // dependency does not specify an aggregator or ordering and the record serializer has
+ // certain properties. If this optimization is enabled, we can safely avoid the copy.
//
// Exchange never configures its ShuffledRDDs with aggregators or key orderings, so we only
// need to check whether the optimization is enabled and supported by our serializer.
- //
- // This optimization also applies to UnsafeShuffleManager (added in SPARK-7081).
false
} else {
- // Spark's SortShuffleManager uses `ExternalSorter` to buffer records in memory. This code
- // path is used both when SortShuffleManager is used and when UnsafeShuffleManager falls
- // back to SortShuffleManager to perform a shuffle that the new fast path can't handle. In
- // both cases, we must copy.
+ // Spark's SortShuffleManager uses `ExternalSorter` to buffer records in memory, so we must
+ // copy.
true
}
} else if (shuffleManager.isInstanceOf[HashShuffleManager]) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
index 75d1fced59..1680d7e0a8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
@@ -101,7 +101,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext {
val oldEnv = SparkEnv.get // save the old SparkEnv, as it will be overwritten
Utils.tryWithSafeFinally {
val conf = new SparkConf()
- .set("spark.shuffle.spill.initialMemoryThreshold", "1024")
+ .set("spark.shuffle.spill.initialMemoryThreshold", "1")
.set("spark.shuffle.sort.bypassMergeThreshold", "0")
.set("spark.testing.memory", "80000")
@@ -109,7 +109,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext {
outputFile = File.createTempFile("test-unsafe-row-serializer-spill", "")
// prepare data
val converter = unsafeRowConverter(Array(IntegerType))
- val data = (1 to 1000).iterator.map { i =>
+ val data = (1 to 10000).iterator.map { i =>
(i, converter(Row(i)))
}
val sorter = new ExternalSorter[Int, UnsafeRow, UnsafeRow](
@@ -141,9 +141,8 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext {
}
}
- test("SPARK-10403: unsafe row serializer with UnsafeShuffleManager") {
- val conf = new SparkConf()
- .set("spark.shuffle.manager", "tungsten-sort")
+ test("SPARK-10403: unsafe row serializer with SortShuffleManager") {
+ val conf = new SparkConf().set("spark.shuffle.manager", "sort")
sc = new SparkContext("local", "test", conf)
val row = Row("Hello", 123)
val unsafeRow = toUnsafeRow(row, Array(StringType, IntegerType))