aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSital Kedia <skedia@fb.com>2016-06-30 10:53:18 -0700
committerDavies Liu <davies.liu@gmail.com>2016-06-30 10:53:18 -0700
commit07f46afc733b1718d528a6ea5c0d774f047024fa (patch)
treed150b6afc44dfdc05e57ebaba4136b0c0e4cf063
parent5344bade8efb6f12aa43fbfbbbc2e3c0c7d16d98 (diff)
downloadspark-07f46afc733b1718d528a6ea5c0d774f047024fa.tar.gz
spark-07f46afc733b1718d528a6ea5c0d774f047024fa.tar.bz2
spark-07f46afc733b1718d528a6ea5c0d774f047024fa.zip
[SPARK-13850] Force the sorter to Spill when number of elements in th…
## What changes were proposed in this pull request? Force the sorter to Spill when number of elements in the pointer array reach a certain size. This is to workaround the issue of timSort failing on large buffer size. ## How was this patch tested? Tested by running a job which was failing without this change due to TimSort bug. Author: Sital Kedia <skedia@fb.com> Closes #13107 from sitalkedia/fix_TimSort.
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java10
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java23
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java3
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java2
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java3
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala5
12 files changed, 60 insertions, 12 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
index 014aef86b5..696ee73a76 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
@@ -72,7 +72,10 @@ final class ShuffleExternalSorter extends MemoryConsumer {
private final TaskContext taskContext;
private final ShuffleWriteMetrics writeMetrics;
- /** Force this sorter to spill when there are this many elements in memory. For testing only */
+ /**
+ * Force this sorter to spill when there are this many elements in memory. The default value is
+ * 1024 * 1024 * 1024, which allows the maximum size of the pointer array to be 8G.
+ */
private final long numElementsForSpillThreshold;
/** The buffer size to use when writing spills using DiskBlockObjectWriter */
@@ -114,7 +117,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.numElementsForSpillThreshold =
- conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", Long.MAX_VALUE);
+ conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", 1024 * 1024 * 1024);
this.writeMetrics = writeMetrics;
this.inMemSorter = new ShuffleInMemorySorter(
this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", true));
@@ -372,7 +375,8 @@ final class ShuffleExternalSorter extends MemoryConsumer {
// for tests
assert(inMemSorter != null);
- if (inMemSorter.numRecords() > numElementsForSpillThreshold) {
+ if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
+ logger.info("Spilling data because number of spilledRecords crossed the threshold " + numElementsForSpillThreshold);
spill();
}
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index ec15f0b59d..d6a255ed9d 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -27,6 +27,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.spark.SparkEnv;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.memory.MemoryConsumer;
@@ -60,6 +61,13 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
private final int fileBufferSizeBytes;
/**
+ * Force this sorter to spill when there are this many elements in memory. The default value is
+ * 1024 * 1024 * 1024 / 2 which allows the maximum size of the pointer array to be 8G.
+ */
+ public static final long DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD = 1024 * 1024 * 1024 / 2;
+
+ private final long numElementsForSpillThreshold;
+ /**
* Memory pages that hold the records being sorted. The pages in this list are freed when
* spilling, although in principle we could recycle these pages across spills (on the other hand,
* this might not be necessary if we maintained a pool of re-usable pages in the TaskMemoryManager
@@ -88,9 +96,10 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
PrefixComparator prefixComparator,
int initialSize,
long pageSizeBytes,
+ long numElementsForSpillThreshold,
UnsafeInMemorySorter inMemorySorter) throws IOException {
UnsafeExternalSorter sorter = new UnsafeExternalSorter(taskMemoryManager, blockManager,
- serializerManager, taskContext, recordComparator, prefixComparator, initialSize,
+ serializerManager, taskContext, recordComparator, prefixComparator, initialSize, numElementsForSpillThreshold,
pageSizeBytes, inMemorySorter, false /* ignored */);
sorter.spill(Long.MAX_VALUE, sorter);
// The external sorter will be used to insert records, in-memory sorter is not needed.
@@ -107,9 +116,10 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
PrefixComparator prefixComparator,
int initialSize,
long pageSizeBytes,
+ long numElementsForSpillThreshold,
boolean canUseRadixSort) {
return new UnsafeExternalSorter(taskMemoryManager, blockManager, serializerManager,
- taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes, null,
+ taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes, numElementsForSpillThreshold, null,
canUseRadixSort);
}
@@ -122,6 +132,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
PrefixComparator prefixComparator,
int initialSize,
long pageSizeBytes,
+ long numElementsForSpillThreshold,
@Nullable UnsafeInMemorySorter existingInMemorySorter,
boolean canUseRadixSort) {
super(taskMemoryManager, pageSizeBytes, taskMemoryManager.getTungstenMemoryMode());
@@ -143,6 +154,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
this.inMemSorter = existingInMemorySorter;
}
this.peakMemoryUsedBytes = getMemoryUsage();
+ this.numElementsForSpillThreshold = numElementsForSpillThreshold;
// Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
// the end of the task. This is necessary to avoid memory leaks in when the downstream operator
@@ -373,6 +385,12 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
Object recordBase, long recordOffset, int length, long prefix, boolean prefixIsNull)
throws IOException {
+ assert(inMemSorter != null);
+ if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
+ logger.info("Spilling data because number of spilledRecords crossed the threshold " + numElementsForSpillThreshold);
+ spill();
+ }
+
growPointerArrayIfNecessary();
// Need 4 bytes to store the record length.
final int required = length + 4;
@@ -384,7 +402,6 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
pageCursor += 4;
Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
pageCursor += length;
- assert(inMemSorter != null);
inMemSorter.insertRecord(recordAddress, prefix, prefixIsNull);
}
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index bce958c3dc..3ea99233fe 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -176,6 +176,7 @@ public class UnsafeExternalSorterSuite {
prefixComparator,
/* initialSize */ 1024,
pageSizeBytes,
+ UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD,
shouldUseRadixSort());
}
@@ -399,6 +400,7 @@ public class UnsafeExternalSorterSuite {
null,
/* initialSize */ 1024,
pageSizeBytes,
+ UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD,
shouldUseRadixSort());
long[] record = new long[100];
int recordSize = record.length * 8;
@@ -435,6 +437,7 @@ public class UnsafeExternalSorterSuite {
prefixComparator,
1024,
pageSizeBytes,
+ UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD,
shouldUseRadixSort());
// Peak memory should be monotonically increasing. More specifically, every time
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
index 0b177ad411..b4e87c3035 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
@@ -89,6 +89,8 @@ public final class UnsafeExternalRowSorter {
sparkEnv.conf().getInt("spark.shuffle.sort.initialBufferSize",
DEFAULT_INITIAL_SORT_BUFFER_SIZE),
pageSizeBytes,
+ SparkEnv.get().conf().getLong("spark.shuffle.spill.numElementsForceSpillThreshold", UnsafeExternalSorter
+ .DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD),
canUseRadixSort
);
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
index 1f1b5389aa..3705291e1f 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
@@ -29,6 +29,7 @@ import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.KVIterator;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.map.BytesToBytesMap;
+import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter;
/**
* Unsafe-based HashMap for performing aggregations where the aggregated values are fixed-width.
@@ -246,6 +247,8 @@ public final class UnsafeFixedWidthAggregationMap {
SparkEnv.get().blockManager(),
SparkEnv.get().serializerManager(),
map.getPageSizeBytes(),
+ SparkEnv.get().conf().getLong("spark.shuffle.spill.numElementsForceSpillThreshold", UnsafeExternalSorter
+ .DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD),
map);
}
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
index b1cc523363..daa948f5c6 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
@@ -55,8 +55,9 @@ public final class UnsafeKVExternalSorter {
StructType valueSchema,
BlockManager blockManager,
SerializerManager serializerManager,
- long pageSizeBytes) throws IOException {
- this(keySchema, valueSchema, blockManager, serializerManager, pageSizeBytes, null);
+ long pageSizeBytes,
+ long numElementsForSpillThreshold) throws IOException {
+ this(keySchema, valueSchema, blockManager, serializerManager, pageSizeBytes, numElementsForSpillThreshold, null);
}
public UnsafeKVExternalSorter(
@@ -65,6 +66,7 @@ public final class UnsafeKVExternalSorter {
BlockManager blockManager,
SerializerManager serializerManager,
long pageSizeBytes,
+ long numElementsForSpillThreshold,
@Nullable BytesToBytesMap map) throws IOException {
this.keySchema = keySchema;
this.valueSchema = valueSchema;
@@ -90,6 +92,7 @@ public final class UnsafeKVExternalSorter {
SparkEnv.get().conf().getInt("spark.shuffle.sort.initialBufferSize",
UnsafeExternalRowSorter.DEFAULT_INITIAL_SORT_BUFFER_SIZE),
pageSizeBytes,
+ numElementsForSpillThreshold,
canUseRadixSort);
} else {
// The array will be used to do in-place sort, which require half of the space to be empty.
@@ -136,6 +139,7 @@ public final class UnsafeKVExternalSorter {
SparkEnv.get().conf().getInt("spark.shuffle.sort.initialBufferSize",
UnsafeExternalRowSorter.DEFAULT_INITIAL_SORT_BUFFER_SIZE),
pageSizeBytes,
+ numElementsForSpillThreshold,
inMemSorter);
// reset the map, so we can re-use it to insert new records. the inMemSorter will not used
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala
index 1b9634cfc0..93f007f5b3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala
@@ -345,6 +345,8 @@ case class WindowExec(
null,
1024,
SparkEnv.get.memoryManager.pageSizeBytes,
+ SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
+ UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD),
false)
rows.foreach { r =>
sorter.insertRecord(r.getBaseObject, r.getBaseOffset, r.getSizeInBytes, 0, false)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index f56b50a543..9a0b46c1a4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
/** A container for all the details required when writing to a table. */
@@ -389,7 +390,9 @@ private[sql] class DynamicPartitionWriterContainer(
StructType.fromAttributes(dataColumns),
SparkEnv.get.blockManager,
SparkEnv.get.serializerManager,
- TaskContext.get().taskMemoryManager().pageSizeBytes)
+ TaskContext.get().taskMemoryManager().pageSizeBytes,
+ SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
+ UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD))
while (iterator.hasNext) {
val currentRow = iterator.next()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
index d870d91edc..0553086a22 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
@@ -49,6 +49,8 @@ class UnsafeCartesianRDD(left : RDD[UnsafeRow], right : RDD[UnsafeRow], numField
null,
1024,
SparkEnv.get.memoryManager.pageSizeBytes,
+ SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
+ UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD),
false)
val partition = split.asInstanceOf[CartesianPartition]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index efb04912d7..117d6672ee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.UnsafeKVExternalSorter
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, PartitioningUtils}
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
object FileStreamSink {
// The name of the subdirectory that is used to store metadata about which files are valid.
@@ -209,7 +210,9 @@ class FileStreamSinkWriter(
StructType.fromAttributes(writeColumns),
SparkEnv.get.blockManager,
SparkEnv.get.serializerManager,
- TaskContext.get().taskMemoryManager().pageSizeBytes)
+ TaskContext.get().taskMemoryManager().pageSizeBytes,
+ SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
+ UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD))
while (iterator.hasNext) {
val currentRow = iterator.next()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
index 03d4be8ee5..3d869c77e9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{InterpretedOrdering, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
+import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
/**
* Test suite for [[UnsafeKVExternalSorter]], with randomly generated test data.
@@ -123,7 +124,8 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext {
metricsSystem = null))
val sorter = new UnsafeKVExternalSorter(
- keySchema, valueSchema, SparkEnv.get.blockManager, SparkEnv.get.serializerManager, pageSize)
+ keySchema, valueSchema, SparkEnv.get.blockManager, SparkEnv.get.serializerManager,
+ pageSize, UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD)
// Insert the keys and values into the sorter
inputData.foreach { case (k, v) =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index 794fe264ea..e65c24e6f1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -44,6 +44,7 @@ import org.apache.spark.sql.execution.UnsafeKVExternalSorter
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableJobConf
+import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
/**
* Internal helper class that saves an RDD using a Hive OutputFormat.
@@ -280,7 +281,9 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
StructType.fromAttributes(dataOutput),
SparkEnv.get.blockManager,
SparkEnv.get.serializerManager,
- TaskContext.get().taskMemoryManager().pageSizeBytes)
+ TaskContext.get().taskMemoryManager().pageSizeBytes,
+ SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
+ UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD))
while (iterator.hasNext) {
val inputRow = iterator.next()