aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-07-07 18:36:35 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-07-07 18:48:12 -0700
commitd3d5f2ab25acaf261753738e63ae30a34d28c291 (patch)
treed91eb7156e4c136c88ba9fdfcbaf86f93b0cecca
parent83a621a5a8f8a2991c4cfa687279589e5c623d46 (diff)
downloadspark-d3d5f2ab25acaf261753738e63ae30a34d28c291.tar.gz
spark-d3d5f2ab25acaf261753738e63ae30a34d28c291.tar.bz2
spark-d3d5f2ab25acaf261753738e63ae30a34d28c291.zip
[SPARK-8868] SqlSerializer2 can go into infinite loop when row consists only of NullType columns
https://issues.apache.org/jira/browse/SPARK-8868 Author: Yin Huai <yhuai@databricks.com> Closes #7262 from yhuai/SPARK-8868 and squashes the following commits: cb58780 [Yin Huai] Andrew's comment. e456857 [Yin Huai] Josh's comments. 5122e65 [Yin Huai] If types of all columns are NullTypes, do not use serializer2. (cherry picked from commit 68a4a169714e11d8c537ad9431ae9974f6b7e8d3) Signed-off-by: Josh Rosen <joshrosen@databricks.com>
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala25
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala20
2 files changed, 39 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
index 256d527d7b..65c0e52012 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
@@ -186,23 +186,38 @@ private[sql] object SparkSqlSerializer2 {
/**
* Check if rows with the given schema can be serialized with ShuffleSerializer.
+ * Right now, we do not support a schema having complex types or UDTs, or all data types
+ * of fields are NullTypes.
*/
def support(schema: Array[DataType]): Boolean = {
if (schema == null) return true
+ var allNullTypes = true
var i = 0
while (i < schema.length) {
schema(i) match {
- case udt: UserDefinedType[_] => return false
- case array: ArrayType => return false
- case map: MapType => return false
- case struct: StructType => return false
+ case NullType => // Do nothing
+ case udt: UserDefinedType[_] =>
+ allNullTypes = false
+ return false
+ case array: ArrayType =>
+ allNullTypes = false
+ return false
+ case map: MapType =>
+ allNullTypes = false
+ return false
+ case struct: StructType =>
+ allNullTypes = false
+ return false
case _ =>
+ allNullTypes = false
}
i += 1
}
- return true
+ // If types of fields are all NullTypes, we return false.
+ // Otherwise, we return true.
+ return !allNullTypes
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala
index 6ca5390cde..05af102da5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala
@@ -42,7 +42,6 @@ class SparkSqlSerializer2DataTypeSuite extends SparkFunSuite {
}
checkSupported(null, isSupported = true)
- checkSupported(NullType, isSupported = true)
checkSupported(BooleanType, isSupported = true)
checkSupported(ByteType, isSupported = true)
checkSupported(ShortType, isSupported = true)
@@ -57,6 +56,8 @@ class SparkSqlSerializer2DataTypeSuite extends SparkFunSuite {
checkSupported(DecimalType(10, 5), isSupported = true)
checkSupported(DecimalType.Unlimited, isSupported = true)
+ // If NullType is the only data type in the schema, we do not support it.
+ checkSupported(NullType, isSupported = false)
// For now, ArrayType, MapType, and StructType are not supported.
checkSupported(ArrayType(DoubleType, true), isSupported = false)
checkSupported(ArrayType(StringType, false), isSupported = false)
@@ -169,6 +170,23 @@ abstract class SparkSqlSerializer2Suite extends QueryTest with BeforeAndAfterAll
val df = sql(s"SELECT 1 + 1 FROM shuffle")
checkSerializer(df.queryExecution.executedPlan, classOf[SparkSqlSerializer])
}
+
+ test("types of fields are all NullTypes") {
+ // Test range partitioning code path.
+ val nulls = sql(s"SELECT null as a, null as b, null as c")
+ val df = nulls.unionAll(nulls).sort("a")
+ checkSerializer(df.queryExecution.executedPlan, classOf[SparkSqlSerializer])
+ checkAnswer(
+ df,
+ Row(null, null, null) :: Row(null, null, null) :: Nil)
+
+ // Test hash partitioning code path.
+ val oneRow = sql(s"SELECT DISTINCT null, null, null FROM shuffle")
+ checkSerializer(oneRow.queryExecution.executedPlan, classOf[SparkSqlSerializer])
+ checkAnswer(
+ oneRow,
+ Row(null, null, null))
+ }
}
/** Tests SparkSqlSerializer2 with sort based shuffle without sort merge. */