aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2015-09-10 11:48:43 -0700
committerAndrew Or <andrew@databricks.com>2015-09-10 11:48:43 -0700
commite04811137680f937669cdcc78771227aeb7cd849 (patch)
treef2c052c949f649b20cb3ed1d6639f5653a82437d /sql
parent49da38e5f728e05e8e929c4dcd37145ba060151d (diff)
downloadspark-e04811137680f937669cdcc78771227aeb7cd849.tar.gz
spark-e04811137680f937669cdcc78771227aeb7cd849.tar.bz2
spark-e04811137680f937669cdcc78771227aeb7cd849.zip
[SPARK-10466] [SQL] UnsafeRow SerDe exception with data spill
Data Spill with UnsafeRow causes assert failure. ``` java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:165) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.writeKey(UnsafeRowSerializer.scala:75) at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:180) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:688) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:687) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:687) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:683) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:683) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) ``` To reproduce that with code (thanks andrewor14): ```scala bin/spark-shell --master local --conf spark.shuffle.memoryFraction=0.005 --conf spark.shuffle.sort.bypassMergeThreshold=0 sc.parallelize(1 to 2 * 1000 * 1000, 10) .map { i => (i, i) }.toDF("a", "b").groupBy("b").avg().count() ``` Author: Cheng Hao <hao.cheng@intel.com> Closes #8635 from chenghao-intel/unsafe_spill.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala64
2 files changed, 61 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
index 5c18558f9b..e060c06d9e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
@@ -72,7 +72,7 @@ private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInst
override def writeKey[T: ClassTag](key: T): SerializationStream = {
// The key is only needed on the map side when computing partition ids. It does not need to
// be shuffled.
- assert(key.isInstanceOf[Int])
+ assert(null == key || key.isInstanceOf[Int])
this
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
index bd02c73a26..0113d052e3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
@@ -17,13 +17,17 @@
package org.apache.spark.sql.execution
-import java.io.{DataOutputStream, ByteArrayInputStream, ByteArrayOutputStream}
+import java.io.{File, DataOutputStream, ByteArrayInputStream, ByteArrayOutputStream}
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.executor.ShuffleWriteMetrics
+import org.apache.spark.storage.ShuffleBlockId
+import org.apache.spark.util.collection.ExternalSorter
+import org.apache.spark.util.Utils
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.types._
+import org.apache.spark._
/**
@@ -40,9 +44,15 @@ class ClosableByteArrayInputStream(buf: Array[Byte]) extends ByteArrayInputStrea
class UnsafeRowSerializerSuite extends SparkFunSuite {
private def toUnsafeRow(row: Row, schema: Array[DataType]): UnsafeRow = {
- val internalRow = CatalystTypeConverters.convertToCatalyst(row).asInstanceOf[InternalRow]
+ val converter = unsafeRowConverter(schema)
+ converter(row)
+ }
+
+ private def unsafeRowConverter(schema: Array[DataType]): Row => UnsafeRow = {
val converter = UnsafeProjection.create(schema)
- converter.apply(internalRow)
+ (row: Row) => {
+ converter(CatalystTypeConverters.convertToCatalyst(row).asInstanceOf[InternalRow])
+ }
}
test("toUnsafeRow() test helper method") {
@@ -87,4 +97,50 @@ class UnsafeRowSerializerSuite extends SparkFunSuite {
assert(!deserializerIter.hasNext)
assert(input.closed)
}
+
+ test("SPARK-10466: external sorter spilling with unsafe row serializer") {
+ var sc: SparkContext = null
+ var outputFile: File = null
+ val oldEnv = SparkEnv.get // save the old SparkEnv, as it will be overwritten
+ Utils.tryWithSafeFinally {
+ val conf = new SparkConf()
+ .set("spark.shuffle.spill.initialMemoryThreshold", "1024")
+ .set("spark.shuffle.sort.bypassMergeThreshold", "0")
+ .set("spark.shuffle.memoryFraction", "0.0001")
+
+ sc = new SparkContext("local", "test", conf)
+ outputFile = File.createTempFile("test-unsafe-row-serializer-spill", "")
+ // prepare data
+ val converter = unsafeRowConverter(Array(IntegerType))
+ val data = (1 to 1000).iterator.map { i =>
+ (i, converter(Row(i)))
+ }
+ val sorter = new ExternalSorter[Int, UnsafeRow, UnsafeRow](
+ partitioner = Some(new HashPartitioner(10)),
+ serializer = Some(new UnsafeRowSerializer(numFields = 1)))
+
+ // Ensure we spilled something and have to merge them later
+ assert(sorter.numSpills === 0)
+ sorter.insertAll(data)
+ assert(sorter.numSpills > 0)
+
+ // Merging spilled files should not throw assertion error
+ val taskContext =
+ new TaskContextImpl(0, 0, 0, 0, null, null, InternalAccumulator.create(sc))
+ taskContext.taskMetrics.shuffleWriteMetrics = Some(new ShuffleWriteMetrics)
+ sorter.writePartitionedFile(ShuffleBlockId(0, 0, 0), taskContext, outputFile)
+ } {
+ // Clean up
+ if (sc != null) {
+ sc.stop()
+ }
+
+ // restore the spark env
+ SparkEnv.set(oldEnv)
+
+ if (outputFile != null) {
+ outputFile.delete()
+ }
+ }
+ }
}