aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-03-16 18:20:30 -0700
committerYin Huai <yhuai@databricks.com>2016-03-16 18:20:30 -0700
commit917f4000b418ee0997551d4dfabb369c4e9243a9 (patch)
tree023b5befbfb4992d3156655ae6ff000524c649dd /sql/core/src
parentca9ef86c84ee84263f437a979017898f4bed0feb (diff)
downloadspark-917f4000b418ee0997551d4dfabb369c4e9243a9.tar.gz
spark-917f4000b418ee0997551d4dfabb369c4e9243a9.tar.bz2
spark-917f4000b418ee0997551d4dfabb369c4e9243a9.zip
[SPARK-13719][SQL] Parse JSON rows having an array type and a struct type in the same fieild
## What changes were proposed in this pull request? This https://github.com/apache/spark/pull/2400 added the support to parse JSON rows wrapped with an array. However, this throws an exception when the given data contains array data and struct data in the same field as below: ```json {"a": {"b": 1}} {"a": []} ``` and the schema is given as below: ```scala val schema = StructType( StructField("a", StructType( StructField("b", StringType) :: Nil )) :: Nil) ``` - **Before** ```scala sqlContext.read.schema(schema).json(path).show() ``` ```scala Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 0.0 failed 4 times, most recent failure: Lost task 7.3 in stage 0.0 (TID 10, 192.168.1.170): java.lang.ClassCastException: org.apache.spark.sql.types.GenericArrayData cannot be cast to org.apache.spark.sql.catalyst.InternalRow at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getStruct(rows.scala:50) at org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getStruct(rows.scala:247) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source) ... ``` - **After** ```scala sqlContext.read.schema(schema).json(path).show() ``` ```bash +----+ | a| +----+ | [1]| |null| +----+ ``` For other data types, in this case it converts the given values are `null` but only this case emits an exception. This PR makes the support for wrapped rows applied only at the top level. ## How was this patch tested? Unit tests were used and `./dev/run_tests` for code style tests. Author: hyukjinkwon <gurwls223@gmail.com> Closes #11752 from HyukjinKwon/SPARK-3308-follow-up.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala37
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala19
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala5
4 files changed, 48 insertions, 14 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 3fa5ebf1bb..751d78def4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.datasources.json
import java.io.CharArrayWriter
import com.fasterxml.jackson.core.JsonFactory
-import com.google.common.base.Objects
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
import org.apache.hadoop.mapred.{JobConf, TextInputFormat}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
index b2f5c1e964..3252b6c77f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
@@ -49,8 +49,31 @@ object JacksonParser {
/**
* Parse the current token (and related children) according to a desired schema
+ * This is an wrapper for the method `convertField()` to handle a row wrapped
+ * with an array.
*/
- def convertField(
+ def convertRootField(
+ factory: JsonFactory,
+ parser: JsonParser,
+ schema: DataType): Any = {
+ import com.fasterxml.jackson.core.JsonToken._
+ (parser.getCurrentToken, schema) match {
+ case (START_ARRAY, st: StructType) =>
+ // SPARK-3308: support reading top level JSON arrays and take every element
+ // in such an array as a row
+ convertArray(factory, parser, st)
+
+ case (START_OBJECT, ArrayType(st, _)) =>
+ // the business end of SPARK-3308:
+ // when an object is found but an array is requested just wrap it in a list
+ convertField(factory, parser, st) :: Nil
+
+ case _ =>
+ convertField(factory, parser, schema)
+ }
+ }
+
+ private def convertField(
factory: JsonFactory,
parser: JsonParser,
schema: DataType): Any = {
@@ -157,19 +180,9 @@ object JacksonParser {
case (START_OBJECT, st: StructType) =>
convertObject(factory, parser, st)
- case (START_ARRAY, st: StructType) =>
- // SPARK-3308: support reading top level JSON arrays and take every element
- // in such an array as a row
- convertArray(factory, parser, st)
-
case (START_ARRAY, ArrayType(st, _)) =>
convertArray(factory, parser, st)
- case (START_OBJECT, ArrayType(st, _)) =>
- // the business end of SPARK-3308:
- // when an object is found but an array is requested just wrap it in a list
- convertField(factory, parser, st) :: Nil
-
case (START_OBJECT, MapType(StringType, kt, _)) =>
convertMap(factory, parser, kt)
@@ -264,7 +277,7 @@ object JacksonParser {
Utils.tryWithResource(factory.createParser(record)) { parser =>
parser.nextToken()
- convertField(factory, parser, schema) match {
+ convertRootField(factory, parser, schema) match {
case null => failedRecord(record)
case row: InternalRow => row :: Nil
case array: ArrayData =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 4a8c128fa9..6d942c4c90 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -65,7 +65,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
Utils.tryWithResource(factory.createParser(writer.toString)) { parser =>
parser.nextToken()
- JacksonParser.convertField(factory, parser, dataType)
+ JacksonParser.convertRootField(factory, parser, dataType)
}
}
@@ -1426,6 +1426,23 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}
}
+ test("Parse JSON rows having an array type and a struct type in the same field.") {
+ withTempDir { dir =>
+ val dir = Utils.createTempDir()
+ dir.delete()
+ val path = dir.getCanonicalPath
+ arrayAndStructRecords.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
+
+ val schema =
+ StructType(
+ StructField("a", StructType(
+ StructField("b", StringType) :: Nil
+ )) :: Nil)
+ val jsonDF = sqlContext.read.schema(schema).json(path)
+ assert(jsonDF.count() == 2)
+ }
+ }
+
test("SPARK-12872 Support to specify the option for compression codec") {
withTempDir { dir =>
val dir = Utils.createTempDir()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
index a0836058d3..b2eff816ee 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
@@ -209,6 +209,11 @@ private[json] trait TestJsonData {
sqlContext.sparkContext.parallelize(
"""{"ts":1451732645}""" :: Nil)
+ def arrayAndStructRecords: RDD[String] =
+ sqlContext.sparkContext.parallelize(
+ """{"a": {"b": 1}}""" ::
+ """{"a": []}""" :: Nil)
+
lazy val singleRow: RDD[String] = sqlContext.sparkContext.parallelize("""{"a":123}""" :: Nil)
def empty: RDD[String] = sqlContext.sparkContext.parallelize(Seq[String]())