diff options
author | Sital Kedia <skedia@fb.com> | 2016-06-30 10:53:18 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2016-06-30 10:53:18 -0700 |
commit | 07f46afc733b1718d528a6ea5c0d774f047024fa (patch) | |
tree | d150b6afc44dfdc05e57ebaba4136b0c0e4cf063 /sql/hive/src/main | |
parent | 5344bade8efb6f12aa43fbfbbbc2e3c0c7d16d98 (diff) | |
download | spark-07f46afc733b1718d528a6ea5c0d774f047024fa.tar.gz spark-07f46afc733b1718d528a6ea5c0d774f047024fa.tar.bz2 spark-07f46afc733b1718d528a6ea5c0d774f047024fa.zip |
[SPARK-13850] Force the sorter to Spill when number of elements in th…
## What changes were proposed in this pull request?
Force the sorter to Spill when number of elements in the pointer array reach a certain size. This is to workaround the issue of timSort failing on large buffer size.
## How was this patch tested?
Tested by running a job which was failing without this change due to TimSort bug.
Author: Sital Kedia <skedia@fb.com>
Closes #13107 from sitalkedia/fix_TimSort.
Diffstat (limited to 'sql/hive/src/main')
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala | 5 |
1 files changed, 4 insertions, 1 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala index 794fe264ea..e65c24e6f1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala @@ -44,6 +44,7 @@ import org.apache.spark.sql.execution.UnsafeKVExternalSorter import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.types._ import org.apache.spark.util.SerializableJobConf +import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter /** * Internal helper class that saves an RDD using a Hive OutputFormat. @@ -280,7 +281,9 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( StructType.fromAttributes(dataOutput), SparkEnv.get.blockManager, SparkEnv.get.serializerManager, - TaskContext.get().taskMemoryManager().pageSizeBytes) + TaskContext.get().taskMemoryManager().pageSizeBytes, + SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", + UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD)) while (iterator.hasNext) { val inputRow = iterator.next() |