aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src
diff options
context:
space:
mode:
authorSital Kedia <skedia@fb.com>2016-06-30 10:53:18 -0700
committerDavies Liu <davies.liu@gmail.com>2016-06-30 10:53:18 -0700
commit07f46afc733b1718d528a6ea5c0d774f047024fa (patch)
treed150b6afc44dfdc05e57ebaba4136b0c0e4cf063 /sql/hive/src
parent5344bade8efb6f12aa43fbfbbbc2e3c0c7d16d98 (diff)
downloadspark-07f46afc733b1718d528a6ea5c0d774f047024fa.tar.gz
spark-07f46afc733b1718d528a6ea5c0d774f047024fa.tar.bz2
spark-07f46afc733b1718d528a6ea5c0d774f047024fa.zip
[SPARK-13850] Force the sorter to Spill when number of elements in th…
## What changes were proposed in this pull request? Force the sorter to Spill when number of elements in the pointer array reach a certain size. This is to workaround the issue of timSort failing on large buffer size. ## How was this patch tested? Tested by running a job which was failing without this change due to TimSort bug. Author: Sital Kedia <skedia@fb.com> Closes #13107 from sitalkedia/fix_TimSort.
Diffstat (limited to 'sql/hive/src')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala5
1 files changed, 4 insertions, 1 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index 794fe264ea..e65c24e6f1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -44,6 +44,7 @@ import org.apache.spark.sql.execution.UnsafeKVExternalSorter
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableJobConf
+import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
/**
* Internal helper class that saves an RDD using a Hive OutputFormat.
@@ -280,7 +281,9 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
StructType.fromAttributes(dataOutput),
SparkEnv.get.blockManager,
SparkEnv.get.serializerManager,
- TaskContext.get().taskMemoryManager().pageSizeBytes)
+ TaskContext.get().taskMemoryManager().pageSizeBytes,
+ SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
+ UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD))
while (iterator.hasNext) {
val inputRow = iterator.next()