From 07f46afc733b1718d528a6ea5c0d774f047024fa Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Thu, 30 Jun 2016 10:53:18 -0700 Subject: [SPARK-13850] Force the sorter to Spill when number of elements in th… MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What changes were proposed in this pull request? Force the sorter to Spill when number of elements in the pointer array reach a certain size. This is to workaround the issue of timSort failing on large buffer size. ## How was this patch tested? Tested by running a job which was failing without this change due to TimSort bug. Author: Sital Kedia Closes #13107 from sitalkedia/fix_TimSort. --- .../org/apache/spark/sql/execution/UnsafeExternalRowSorter.java | 2 ++ .../spark/sql/execution/UnsafeFixedWidthAggregationMap.java | 3 +++ .../org/apache/spark/sql/execution/UnsafeKVExternalSorter.java | 8 ++++++-- .../main/scala/org/apache/spark/sql/execution/WindowExec.scala | 2 ++ .../apache/spark/sql/execution/datasources/WriterContainer.scala | 5 ++++- .../apache/spark/sql/execution/joins/CartesianProductExec.scala | 2 ++ .../org/apache/spark/sql/execution/streaming/FileStreamSink.scala | 5 ++++- .../apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala | 4 +++- .../scala/org/apache/spark/sql/hive/hiveWriterContainers.scala | 5 ++++- 9 files changed, 30 insertions(+), 6 deletions(-) (limited to 'sql') diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java index 0b177ad411..b4e87c3035 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java @@ -89,6 +89,8 @@ public final class UnsafeExternalRowSorter { sparkEnv.conf().getInt("spark.shuffle.sort.initialBufferSize", DEFAULT_INITIAL_SORT_BUFFER_SIZE), pageSizeBytes, + SparkEnv.get().conf().getLong("spark.shuffle.spill.numElementsForceSpillThreshold", UnsafeExternalSorter + .DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD), canUseRadixSort ); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java index 1f1b5389aa..3705291e1f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java @@ -29,6 +29,7 @@ import org.apache.spark.sql.types.StructType; import org.apache.spark.unsafe.KVIterator; import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.map.BytesToBytesMap; +import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter; /** * Unsafe-based HashMap for performing aggregations where the aggregated values are fixed-width. @@ -246,6 +247,8 @@ public final class UnsafeFixedWidthAggregationMap { SparkEnv.get().blockManager(), SparkEnv.get().serializerManager(), map.getPageSizeBytes(), + SparkEnv.get().conf().getLong("spark.shuffle.spill.numElementsForceSpillThreshold", UnsafeExternalSorter + .DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD), map); } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java index b1cc523363..daa948f5c6 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java @@ -55,8 +55,9 @@ public final class UnsafeKVExternalSorter { StructType valueSchema, BlockManager blockManager, SerializerManager serializerManager, - long pageSizeBytes) throws IOException { - this(keySchema, valueSchema, blockManager, serializerManager, pageSizeBytes, null); + long pageSizeBytes, + long numElementsForSpillThreshold) throws IOException { + this(keySchema, valueSchema, blockManager, serializerManager, pageSizeBytes, numElementsForSpillThreshold, null); } public UnsafeKVExternalSorter( @@ -65,6 +66,7 @@ public final class UnsafeKVExternalSorter { BlockManager blockManager, SerializerManager serializerManager, long pageSizeBytes, + long numElementsForSpillThreshold, @Nullable BytesToBytesMap map) throws IOException { this.keySchema = keySchema; this.valueSchema = valueSchema; @@ -90,6 +92,7 @@ public final class UnsafeKVExternalSorter { SparkEnv.get().conf().getInt("spark.shuffle.sort.initialBufferSize", UnsafeExternalRowSorter.DEFAULT_INITIAL_SORT_BUFFER_SIZE), pageSizeBytes, + numElementsForSpillThreshold, canUseRadixSort); } else { // The array will be used to do in-place sort, which require half of the space to be empty. @@ -136,6 +139,7 @@ public final class UnsafeKVExternalSorter { SparkEnv.get().conf().getInt("spark.shuffle.sort.initialBufferSize", UnsafeExternalRowSorter.DEFAULT_INITIAL_SORT_BUFFER_SIZE), pageSizeBytes, + numElementsForSpillThreshold, inMemSorter); // reset the map, so we can re-use it to insert new records. the inMemSorter will not used diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala index 1b9634cfc0..93f007f5b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala @@ -345,6 +345,8 @@ case class WindowExec( null, 1024, SparkEnv.get.memoryManager.pageSizeBytes, + SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", + UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD), false) rows.foreach { r => sorter.insertRecord(r.getBaseObject, r.getBaseOffset, r.getSizeInBytes, 0, false) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index f56b50a543..9a0b46c1a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.util.{SerializableConfiguration, Utils} +import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter /** A container for all the details required when writing to a table. */ @@ -389,7 +390,9 @@ private[sql] class DynamicPartitionWriterContainer( StructType.fromAttributes(dataColumns), SparkEnv.get.blockManager, SparkEnv.get.serializerManager, - TaskContext.get().taskMemoryManager().pageSizeBytes) + TaskContext.get().taskMemoryManager().pageSizeBytes, + SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", + UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD)) while (iterator.hasNext) { val currentRow = iterator.next() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala index d870d91edc..0553086a22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala @@ -49,6 +49,8 @@ class UnsafeCartesianRDD(left : RDD[UnsafeRow], right : RDD[UnsafeRow], numField null, 1024, SparkEnv.get.memoryManager.pageSizeBytes, + SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", + UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD), false) val partition = split.asInstanceOf[CartesianPartition] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index efb04912d7..117d6672ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.UnsafeKVExternalSorter import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, PartitioningUtils} import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.util.SerializableConfiguration +import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter object FileStreamSink { // The name of the subdirectory that is used to store metadata about which files are valid. @@ -209,7 +210,9 @@ class FileStreamSinkWriter( StructType.fromAttributes(writeColumns), SparkEnv.get.blockManager, SparkEnv.get.serializerManager, - TaskContext.get().taskMemoryManager().pageSizeBytes) + TaskContext.get().taskMemoryManager().pageSizeBytes, + SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", + UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD)) while (iterator.hasNext) { val currentRow = iterator.next() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala index 03d4be8ee5..3d869c77e9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions.{InterpretedOrdering, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ +import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter /** * Test suite for [[UnsafeKVExternalSorter]], with randomly generated test data. @@ -123,7 +124,8 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext { metricsSystem = null)) val sorter = new UnsafeKVExternalSorter( - keySchema, valueSchema, SparkEnv.get.blockManager, SparkEnv.get.serializerManager, pageSize) + keySchema, valueSchema, SparkEnv.get.blockManager, SparkEnv.get.serializerManager, + pageSize, UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD) // Insert the keys and values into the sorter inputData.foreach { case (k, v) => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala index 794fe264ea..e65c24e6f1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala @@ -44,6 +44,7 @@ import org.apache.spark.sql.execution.UnsafeKVExternalSorter import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.types._ import org.apache.spark.util.SerializableJobConf +import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter /** * Internal helper class that saves an RDD using a Hive OutputFormat. @@ -280,7 +281,9 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( StructType.fromAttributes(dataOutput), SparkEnv.get.blockManager, SparkEnv.get.serializerManager, - TaskContext.get().taskMemoryManager().pageSizeBytes) + TaskContext.get().taskMemoryManager().pageSizeBytes, + SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", + UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD)) while (iterator.hasNext) { val inputRow = iterator.next() -- cgit v1.2.3