aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2015-09-10 11:48:43 -0700
committerAndrew Or <andrew@databricks.com>2015-09-10 11:48:43 -0700
commite04811137680f937669cdcc78771227aeb7cd849 (patch)
treef2c052c949f649b20cb3ed1d6639f5653a82437d /core
parent49da38e5f728e05e8e929c4dcd37145ba060151d (diff)
downloadspark-e04811137680f937669cdcc78771227aeb7cd849.tar.gz
spark-e04811137680f937669cdcc78771227aeb7cd849.tar.bz2
spark-e04811137680f937669cdcc78771227aeb7cd849.zip
[SPARK-10466] [SQL] UnsafeRow SerDe exception with data spill
Data Spill with UnsafeRow causes assert failure. ``` java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:165) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.writeKey(UnsafeRowSerializer.scala:75) at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:180) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:688) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:687) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:687) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:683) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:683) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) ``` To reproduce that with code (thanks andrewor14): ```scala bin/spark-shell --master local --conf spark.shuffle.memoryFraction=0.005 --conf spark.shuffle.sort.bypassMergeThreshold=0 sc.parallelize(1 to 2 * 1000 * 1000, 10) .map { i => (i, i) }.toDF("a", "b").groupBy("b").avg().count() ``` Author: Cheng Hao <hao.cheng@intel.com> Closes #8635 from chenghao-intel/unsafe_spill.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala6
1 files changed, 6 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 19287edbaf..138c05dff1 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -188,6 +188,12 @@ private[spark] class ExternalSorter[K, V, C](
private val spills = new ArrayBuffer[SpilledFile]
+ /**
+ * Number of files this sorter has spilled so far.
+ * Exposed for testing.
+ */
+ private[spark] def numSpills: Int = spills.size
+
override def insertAll(records: Iterator[Product2[K, V]]): Unit = {
// TODO: stop combining if we find that the reduction factor isn't high
val shouldCombine = aggregator.isDefined