aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2015-03-11 14:09:09 +0000
committerSean Owen <sowen@cloudera.com>2015-03-11 14:09:09 +0000
commit55c4831d68c8326380086b5540244f984ea9ec27 (patch)
treeff47be6ab3d9bc3a9b5039e1d183f0d7e9c2512f /sql
parent2d87a415f20c85487537d6791a73827ff537f2c0 (diff)
downloadspark-55c4831d68c8326380086b5540244f984ea9ec27.tar.gz
spark-55c4831d68c8326380086b5540244f984ea9ec27.tar.bz2
spark-55c4831d68c8326380086b5540244f984ea9ec27.zip
SPARK-6245 [SQL] jsonRDD() of empty RDD results in exception
Avoid `UnsupportedOperationException` from JsonRDD.inferSchema on empty RDD. Not sure if this is supposed to be an error (but a better one), but it seems like this case can come up if the input is down-sampled so much that nothing is sampled. Now stuff like this: ``` sqlContext.jsonRDD(sc.parallelize(List[String]())) ``` just results in ``` org.apache.spark.sql.DataFrame = [] ``` Author: Sean Owen <sowen@cloudera.com> Closes #4971 from srowen/SPARK-6245 and squashes the following commits: 3699964 [Sean Owen] Set() -> Set.empty 3c619e1 [Sean Owen] Avoid UnsupportedOperationException from JsonRDD.inferSchema on empty RDD
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala3
3 files changed, 15 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
index e54a2a3679..2b0358c4e2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -48,7 +48,11 @@ private[sql] object JsonRDD extends Logging {
require(samplingRatio > 0, s"samplingRatio ($samplingRatio) should be greater than 0")
val schemaData = if (samplingRatio > 0.99) json else json.sample(false, samplingRatio, 1)
val allKeys =
- parseJson(schemaData, columnNameOfCorruptRecords).map(allKeysWithValueTypes).reduce(_ ++ _)
+ if (schemaData.isEmpty()) {
+ Set.empty[(String,DataType)]
+ } else {
+ parseJson(schemaData, columnNameOfCorruptRecords).map(allKeysWithValueTypes).reduce(_ ++ _)
+ }
createSchema(allKeys)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index 0c21f725f0..320b80d80e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -1033,4 +1033,11 @@ class JsonSuite extends QueryTest {
assert(!logicalRelation2.sameResult(logicalRelation3),
s"$logicalRelation2 and $logicalRelation3 should be considered not having the same result.")
}
+
+ test("SPARK-6245 JsonRDD.inferSchema on empty RDD") {
+ // This is really a test that it doesn't throw an exception
+ val emptySchema = JsonRDD.inferSchema(empty, 1.0, "")
+ assert(StructType(Seq()) === emptySchema)
+ }
+
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
index 15698f61e0..47a97a49da 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
@@ -185,4 +185,7 @@ object TestJsonData {
"""{"a":{, b:3}""" ::
"""{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" ::
"""]""" :: Nil)
+
+ val empty =
+ TestSQLContext.sparkContext.parallelize(Seq[String]())
}