diff options
author | Josh Rosen <joshrosen@databricks.com> | 2015-10-22 09:46:30 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2015-10-22 09:46:30 -0700 |
commit | f6d06adf05afa9c5386dc2396c94e7a98730289f (patch) | |
tree | 9e3d8e4350e0a465124840eea91f6aa39c00b156 /sql/core | |
parent | 94e2064fa1b04c05c805d9175c7c78bf583db5c6 (diff) | |
download | spark-f6d06adf05afa9c5386dc2396c94e7a98730289f.tar.gz spark-f6d06adf05afa9c5386dc2396c94e7a98730289f.tar.bz2 spark-f6d06adf05afa9c5386dc2396c94e7a98730289f.zip |
[SPARK-10708] Consolidate sort shuffle implementations
There's a lot of duplication between SortShuffleManager and UnsafeShuffleManager. Given that these now provide the same set of functionality, now that UnsafeShuffleManager supports large records, I think that we should replace SortShuffleManager's serialized shuffle implementation with UnsafeShuffleManager's and should merge the two managers together.
Author: Josh Rosen <joshrosen@databricks.com>
Closes #8829 from JoshRosen/consolidate-sort-shuffle-implementations.
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala | 23 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala | 9 |
2 files changed, 12 insertions, 20 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 1d3379a5e2..7f60c8f5ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -24,7 +24,6 @@ import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.hash.HashShuffleManager import org.apache.spark.shuffle.sort.SortShuffleManager -import org.apache.spark.shuffle.unsafe.UnsafeShuffleManager import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors.attachTree @@ -87,10 +86,8 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una // fewer partitions (like RangePartitioner, for example). val conf = child.sqlContext.sparkContext.conf val shuffleManager = SparkEnv.get.shuffleManager - val sortBasedShuffleOn = shuffleManager.isInstanceOf[SortShuffleManager] || - shuffleManager.isInstanceOf[UnsafeShuffleManager] + val sortBasedShuffleOn = shuffleManager.isInstanceOf[SortShuffleManager] val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200) - val serializeMapOutputs = conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) if (sortBasedShuffleOn) { val bypassIsSupported = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] if (bypassIsSupported && partitioner.numPartitions <= bypassMergeThreshold) { @@ -99,22 +96,18 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una // doesn't buffer deserialized records. // Note that we'll have to remove this case if we fix SPARK-6026 and remove this bypass. false - } else if (serializeMapOutputs && serializer.supportsRelocationOfSerializedObjects) { - // SPARK-4550 extended sort-based shuffle to serialize individual records prior to sorting - // them. This optimization is guarded by a feature-flag and is only applied in cases where - // shuffle dependency does not specify an aggregator or ordering and the record serializer - // has certain properties. If this optimization is enabled, we can safely avoid the copy. + } else if (serializer.supportsRelocationOfSerializedObjects) { + // SPARK-4550 and SPARK-7081 extended sort-based shuffle to serialize individual records + // prior to sorting them. This optimization is only applied in cases where shuffle + // dependency does not specify an aggregator or ordering and the record serializer has + // certain properties. If this optimization is enabled, we can safely avoid the copy. // // Exchange never configures its ShuffledRDDs with aggregators or key orderings, so we only // need to check whether the optimization is enabled and supported by our serializer. - // - // This optimization also applies to UnsafeShuffleManager (added in SPARK-7081). false } else { - // Spark's SortShuffleManager uses `ExternalSorter` to buffer records in memory. This code - // path is used both when SortShuffleManager is used and when UnsafeShuffleManager falls - // back to SortShuffleManager to perform a shuffle that the new fast path can't handle. In - // both cases, we must copy. + // Spark's SortShuffleManager uses `ExternalSorter` to buffer records in memory, so we must + // copy. true } } else if (shuffleManager.isInstanceOf[HashShuffleManager]) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala index 75d1fced59..1680d7e0a8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala @@ -101,7 +101,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext { val oldEnv = SparkEnv.get // save the old SparkEnv, as it will be overwritten Utils.tryWithSafeFinally { val conf = new SparkConf() - .set("spark.shuffle.spill.initialMemoryThreshold", "1024") + .set("spark.shuffle.spill.initialMemoryThreshold", "1") .set("spark.shuffle.sort.bypassMergeThreshold", "0") .set("spark.testing.memory", "80000") @@ -109,7 +109,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext { outputFile = File.createTempFile("test-unsafe-row-serializer-spill", "") // prepare data val converter = unsafeRowConverter(Array(IntegerType)) - val data = (1 to 1000).iterator.map { i => + val data = (1 to 10000).iterator.map { i => (i, converter(Row(i))) } val sorter = new ExternalSorter[Int, UnsafeRow, UnsafeRow]( @@ -141,9 +141,8 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext { } } - test("SPARK-10403: unsafe row serializer with UnsafeShuffleManager") { - val conf = new SparkConf() - .set("spark.shuffle.manager", "tungsten-sort") + test("SPARK-10403: unsafe row serializer with SortShuffleManager") { + val conf = new SparkConf().set("spark.shuffle.manager", "sort") sc = new SparkContext("local", "test", conf) val row = Row("Hello", 123) val unsafeRow = toUnsafeRow(row, Array(StringType, IntegerType)) |