aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorSital Kedia <skedia@fb.com>2016-06-30 10:53:18 -0700
committerDavies Liu <davies.liu@gmail.com>2016-06-30 10:53:18 -0700
commit07f46afc733b1718d528a6ea5c0d774f047024fa (patch)
treed150b6afc44dfdc05e57ebaba4136b0c0e4cf063 /sql
parent5344bade8efb6f12aa43fbfbbbc2e3c0c7d16d98 (diff)
downloadspark-07f46afc733b1718d528a6ea5c0d774f047024fa.tar.gz
spark-07f46afc733b1718d528a6ea5c0d774f047024fa.tar.bz2
spark-07f46afc733b1718d528a6ea5c0d774f047024fa.zip
[SPARK-13850] Force the sorter to Spill when number of elements in th…
## What changes were proposed in this pull request? Force the sorter to Spill when number of elements in the pointer array reach a certain size. This is to workaround the issue of timSort failing on large buffer size. ## How was this patch tested? Tested by running a job which was failing without this change due to TimSort bug. Author: Sital Kedia <skedia@fb.com> Closes #13107 from sitalkedia/fix_TimSort.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java2
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java3
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala5
9 files changed, 30 insertions, 6 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
index 0b177ad411..b4e87c3035 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
@@ -89,6 +89,8 @@ public final class UnsafeExternalRowSorter {
sparkEnv.conf().getInt("spark.shuffle.sort.initialBufferSize",
DEFAULT_INITIAL_SORT_BUFFER_SIZE),
pageSizeBytes,
+ SparkEnv.get().conf().getLong("spark.shuffle.spill.numElementsForceSpillThreshold", UnsafeExternalSorter
+ .DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD),
canUseRadixSort
);
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
index 1f1b5389aa..3705291e1f 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
@@ -29,6 +29,7 @@ import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.KVIterator;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.map.BytesToBytesMap;
+import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter;
/**
* Unsafe-based HashMap for performing aggregations where the aggregated values are fixed-width.
@@ -246,6 +247,8 @@ public final class UnsafeFixedWidthAggregationMap {
SparkEnv.get().blockManager(),
SparkEnv.get().serializerManager(),
map.getPageSizeBytes(),
+ SparkEnv.get().conf().getLong("spark.shuffle.spill.numElementsForceSpillThreshold", UnsafeExternalSorter
+ .DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD),
map);
}
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
index b1cc523363..daa948f5c6 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
@@ -55,8 +55,9 @@ public final class UnsafeKVExternalSorter {
StructType valueSchema,
BlockManager blockManager,
SerializerManager serializerManager,
- long pageSizeBytes) throws IOException {
- this(keySchema, valueSchema, blockManager, serializerManager, pageSizeBytes, null);
+ long pageSizeBytes,
+ long numElementsForSpillThreshold) throws IOException {
+ this(keySchema, valueSchema, blockManager, serializerManager, pageSizeBytes, numElementsForSpillThreshold, null);
}
public UnsafeKVExternalSorter(
@@ -65,6 +66,7 @@ public final class UnsafeKVExternalSorter {
BlockManager blockManager,
SerializerManager serializerManager,
long pageSizeBytes,
+ long numElementsForSpillThreshold,
@Nullable BytesToBytesMap map) throws IOException {
this.keySchema = keySchema;
this.valueSchema = valueSchema;
@@ -90,6 +92,7 @@ public final class UnsafeKVExternalSorter {
SparkEnv.get().conf().getInt("spark.shuffle.sort.initialBufferSize",
UnsafeExternalRowSorter.DEFAULT_INITIAL_SORT_BUFFER_SIZE),
pageSizeBytes,
+ numElementsForSpillThreshold,
canUseRadixSort);
} else {
// The array will be used to do in-place sort, which require half of the space to be empty.
@@ -136,6 +139,7 @@ public final class UnsafeKVExternalSorter {
SparkEnv.get().conf().getInt("spark.shuffle.sort.initialBufferSize",
UnsafeExternalRowSorter.DEFAULT_INITIAL_SORT_BUFFER_SIZE),
pageSizeBytes,
+ numElementsForSpillThreshold,
inMemSorter);
// reset the map, so we can re-use it to insert new records. the inMemSorter will not used
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala
index 1b9634cfc0..93f007f5b3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala
@@ -345,6 +345,8 @@ case class WindowExec(
null,
1024,
SparkEnv.get.memoryManager.pageSizeBytes,
+ SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
+ UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD),
false)
rows.foreach { r =>
sorter.insertRecord(r.getBaseObject, r.getBaseOffset, r.getSizeInBytes, 0, false)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index f56b50a543..9a0b46c1a4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
/** A container for all the details required when writing to a table. */
@@ -389,7 +390,9 @@ private[sql] class DynamicPartitionWriterContainer(
StructType.fromAttributes(dataColumns),
SparkEnv.get.blockManager,
SparkEnv.get.serializerManager,
- TaskContext.get().taskMemoryManager().pageSizeBytes)
+ TaskContext.get().taskMemoryManager().pageSizeBytes,
+ SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
+ UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD))
while (iterator.hasNext) {
val currentRow = iterator.next()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
index d870d91edc..0553086a22 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
@@ -49,6 +49,8 @@ class UnsafeCartesianRDD(left : RDD[UnsafeRow], right : RDD[UnsafeRow], numField
null,
1024,
SparkEnv.get.memoryManager.pageSizeBytes,
+ SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
+ UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD),
false)
val partition = split.asInstanceOf[CartesianPartition]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index efb04912d7..117d6672ee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.UnsafeKVExternalSorter
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, PartitioningUtils}
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
object FileStreamSink {
// The name of the subdirectory that is used to store metadata about which files are valid.
@@ -209,7 +210,9 @@ class FileStreamSinkWriter(
StructType.fromAttributes(writeColumns),
SparkEnv.get.blockManager,
SparkEnv.get.serializerManager,
- TaskContext.get().taskMemoryManager().pageSizeBytes)
+ TaskContext.get().taskMemoryManager().pageSizeBytes,
+ SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
+ UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD))
while (iterator.hasNext) {
val currentRow = iterator.next()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
index 03d4be8ee5..3d869c77e9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{InterpretedOrdering, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
+import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
/**
* Test suite for [[UnsafeKVExternalSorter]], with randomly generated test data.
@@ -123,7 +124,8 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext {
metricsSystem = null))
val sorter = new UnsafeKVExternalSorter(
- keySchema, valueSchema, SparkEnv.get.blockManager, SparkEnv.get.serializerManager, pageSize)
+ keySchema, valueSchema, SparkEnv.get.blockManager, SparkEnv.get.serializerManager,
+ pageSize, UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD)
// Insert the keys and values into the sorter
inputData.foreach { case (k, v) =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index 794fe264ea..e65c24e6f1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -44,6 +44,7 @@ import org.apache.spark.sql.execution.UnsafeKVExternalSorter
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableJobConf
+import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
/**
* Internal helper class that saves an RDD using a Hive OutputFormat.
@@ -280,7 +281,9 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
StructType.fromAttributes(dataOutput),
SparkEnv.get.blockManager,
SparkEnv.get.serializerManager,
- TaskContext.get().taskMemoryManager().pageSizeBytes)
+ TaskContext.get().taskMemoryManager().pageSizeBytes,
+ SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
+ UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD))
while (iterator.hasNext) {
val inputRow = iterator.next()