diff options
author | Yin Huai <yhuai@databricks.com> | 2015-07-07 18:36:35 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2015-07-07 18:48:12 -0700 |
commit | d3d5f2ab25acaf261753738e63ae30a34d28c291 (patch) | |
tree | d91eb7156e4c136c88ba9fdfcbaf86f93b0cecca | |
parent | 83a621a5a8f8a2991c4cfa687279589e5c623d46 (diff) | |
download | spark-d3d5f2ab25acaf261753738e63ae30a34d28c291.tar.gz spark-d3d5f2ab25acaf261753738e63ae30a34d28c291.tar.bz2 spark-d3d5f2ab25acaf261753738e63ae30a34d28c291.zip |
[SPARK-8868] SqlSerializer2 can go into infinite loop when row consists only of NullType columns
https://issues.apache.org/jira/browse/SPARK-8868
Author: Yin Huai <yhuai@databricks.com>
Closes #7262 from yhuai/SPARK-8868 and squashes the following commits:
cb58780 [Yin Huai] Andrew's comment.
e456857 [Yin Huai] Josh's comments.
5122e65 [Yin Huai] If types of all columns are NullTypes, do not use serializer2.
(cherry picked from commit 68a4a169714e11d8c537ad9431ae9974f6b7e8d3)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala | 25 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala | 20 |
2 files changed, 39 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala index 256d527d7b..65c0e52012 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala @@ -186,23 +186,38 @@ private[sql] object SparkSqlSerializer2 { /** * Check if rows with the given schema can be serialized with ShuffleSerializer. + * Right now, we do not support a schema having complex types or UDTs, or all data types + * of fields are NullTypes. */ def support(schema: Array[DataType]): Boolean = { if (schema == null) return true + var allNullTypes = true var i = 0 while (i < schema.length) { schema(i) match { - case udt: UserDefinedType[_] => return false - case array: ArrayType => return false - case map: MapType => return false - case struct: StructType => return false + case NullType => // Do nothing + case udt: UserDefinedType[_] => + allNullTypes = false + return false + case array: ArrayType => + allNullTypes = false + return false + case map: MapType => + allNullTypes = false + return false + case struct: StructType => + allNullTypes = false + return false case _ => + allNullTypes = false } i += 1 } - return true + // If types of fields are all NullTypes, we return false. + // Otherwise, we return true. + return !allNullTypes } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala index 6ca5390cde..05af102da5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala @@ -42,7 +42,6 @@ class SparkSqlSerializer2DataTypeSuite extends SparkFunSuite { } checkSupported(null, isSupported = true) - checkSupported(NullType, isSupported = true) checkSupported(BooleanType, isSupported = true) checkSupported(ByteType, isSupported = true) checkSupported(ShortType, isSupported = true) @@ -57,6 +56,8 @@ class SparkSqlSerializer2DataTypeSuite extends SparkFunSuite { checkSupported(DecimalType(10, 5), isSupported = true) checkSupported(DecimalType.Unlimited, isSupported = true) + // If NullType is the only data type in the schema, we do not support it. + checkSupported(NullType, isSupported = false) // For now, ArrayType, MapType, and StructType are not supported. checkSupported(ArrayType(DoubleType, true), isSupported = false) checkSupported(ArrayType(StringType, false), isSupported = false) @@ -169,6 +170,23 @@ abstract class SparkSqlSerializer2Suite extends QueryTest with BeforeAndAfterAll val df = sql(s"SELECT 1 + 1 FROM shuffle") checkSerializer(df.queryExecution.executedPlan, classOf[SparkSqlSerializer]) } + + test("types of fields are all NullTypes") { + // Test range partitioning code path. + val nulls = sql(s"SELECT null as a, null as b, null as c") + val df = nulls.unionAll(nulls).sort("a") + checkSerializer(df.queryExecution.executedPlan, classOf[SparkSqlSerializer]) + checkAnswer( + df, + Row(null, null, null) :: Row(null, null, null) :: Nil) + + // Test hash partitioning code path. + val oneRow = sql(s"SELECT DISTINCT null, null, null FROM shuffle") + checkSerializer(oneRow.queryExecution.executedPlan, classOf[SparkSqlSerializer]) + checkAnswer( + oneRow, + Row(null, null, null)) + } } /** Tests SparkSqlSerializer2 with sort based shuffle without sort merge. */ |