aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2017-03-05 14:35:06 -0800
committerBurak Yavuz <brkyvz@gmail.com>2017-03-05 14:35:06 -0800
commit369a148e591bb16ec7da54867610b207602cd698 (patch)
treee7c2469ce548557bef43d3ccdb0fee6d5c006ec5 /sql/catalyst
parent80d5338b32e856870cf187ce17bc87335d690761 (diff)
downloadspark-369a148e591bb16ec7da54867610b207602cd698.tar.gz
spark-369a148e591bb16ec7da54867610b207602cd698.tar.bz2
spark-369a148e591bb16ec7da54867610b207602cd698.zip
[SPARK-19595][SQL] Support json array in from_json
## What changes were proposed in this pull request? This PR proposes to both, **Do not allow json arrays with multiple elements and return null in `from_json` with `StructType` as the schema.** Currently, it only reads the single row when the input is a json array. So, the codes below: ```scala import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ val schema = StructType(StructField("a", IntegerType) :: Nil) Seq(("""[{"a": 1}, {"a": 2}]""")).toDF("struct").select(from_json(col("struct"), schema)).show() ``` prints ``` +--------------------+ |jsontostruct(struct)| +--------------------+ | [1]| +--------------------+ ``` This PR simply suggests to print this as `null` if the schema is `StructType` and input is json array.with multiple elements ``` +--------------------+ |jsontostruct(struct)| +--------------------+ | null| +--------------------+ ``` **Support json arrays in `from_json` with `ArrayType` as the schema.** ```scala import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) Seq(("""[{"a": 1}, {"a": 2}]""")).toDF("array").select(from_json(col("array"), schema)).show() ``` prints ``` +-------------------+ |jsontostruct(array)| +-------------------+ | [[1], [2]]| +-------------------+ ``` ## How was this patch tested? Unit test in `JsonExpressionsSuite`, `JsonFunctionsSuite`, Python doctests and manual test. Author: hyukjinkwon <gurwls223@gmail.com> Closes #16929 from HyukjinKwon/disallow-array.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala57
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala58
2 files changed, 107 insertions, 8 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 1e690a4469..dbff62efdd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json._
-import org.apache.spark.sql.catalyst.util.ParseModes
+import org.apache.spark.sql.catalyst.util.{GenericArrayData, ParseModes}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
@@ -480,23 +480,45 @@ case class JsonTuple(children: Seq[Expression])
}
/**
- * Converts an json input string to a [[StructType]] with the specified schema.
+ * Converts an json input string to a [[StructType]] or [[ArrayType]] with the specified schema.
*/
case class JsonToStruct(
- schema: StructType,
+ schema: DataType,
options: Map[String, String],
child: Expression,
timeZoneId: Option[String] = None)
extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes {
override def nullable: Boolean = true
- def this(schema: StructType, options: Map[String, String], child: Expression) =
+ def this(schema: DataType, options: Map[String, String], child: Expression) =
this(schema, options, child, None)
+ override def checkInputDataTypes(): TypeCheckResult = schema match {
+ case _: StructType | ArrayType(_: StructType, _) =>
+ super.checkInputDataTypes()
+ case _ => TypeCheckResult.TypeCheckFailure(
+ s"Input schema ${schema.simpleString} must be a struct or an array of structs.")
+ }
+
+ @transient
+ lazy val rowSchema = schema match {
+ case st: StructType => st
+ case ArrayType(st: StructType, _) => st
+ }
+
+ // This converts parsed rows to the desired output by the given schema.
+ @transient
+ lazy val converter = schema match {
+ case _: StructType =>
+ (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
+ case ArrayType(_: StructType, _) =>
+ (rows: Seq[InternalRow]) => new GenericArrayData(rows)
+ }
+
@transient
lazy val parser =
new JacksonParser(
- schema,
+ rowSchema,
new JSONOptions(options + ("mode" -> ParseModes.FAIL_FAST_MODE), timeZoneId.get))
override def dataType: DataType = schema
@@ -505,11 +527,32 @@ case class JsonToStruct(
copy(timeZoneId = Option(timeZoneId))
override def nullSafeEval(json: Any): Any = {
+ // When input is,
+ // - `null`: `null`.
+ // - invalid json: `null`.
+ // - empty string: `null`.
+ //
+ // When the schema is array,
+ // - json array: `Array(Row(...), ...)`
+ // - json object: `Array(Row(...))`
+ // - empty json array: `Array()`.
+ // - empty json object: `Array(Row(null))`.
+ //
+ // When the schema is a struct,
+ // - json object/array with single element: `Row(...)`
+ // - json array with multiple elements: `null`
+ // - empty json array: `null`.
+ // - empty json object: `Row(null)`.
+
+ // We need `null` if the input string is an empty string. `JacksonParser` can
+ // deal with this but produces `Nil`.
+ if (json.toString.trim.isEmpty) return null
+
try {
- parser.parse(
+ converter(parser.parse(
json.asInstanceOf[UTF8String],
CreateJacksonParser.utf8String,
- identity[UTF8String]).headOption.orNull
+ identity[UTF8String]))
} catch {
case _: SparkSQLJsonProcessingException => null
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index 0c46819cdb..e3584909dd 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -22,7 +22,7 @@ import java.util.Calendar
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, ParseModes}
-import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType, TimestampType}
+import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
@@ -372,6 +372,62 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
)
}
+ test("from_json - input=array, schema=array, output=array") {
+ val input = """[{"a": 1}, {"a": 2}]"""
+ val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
+ val output = InternalRow(1) :: InternalRow(2) :: Nil
+ checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+ }
+
+ test("from_json - input=object, schema=array, output=array of single row") {
+ val input = """{"a": 1}"""
+ val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
+ val output = InternalRow(1) :: Nil
+ checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+ }
+
+ test("from_json - input=empty array, schema=array, output=empty array") {
+ val input = "[ ]"
+ val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
+ val output = Nil
+ checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+ }
+
+ test("from_json - input=empty object, schema=array, output=array of single row with null") {
+ val input = "{ }"
+ val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
+ val output = InternalRow(null) :: Nil
+ checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+ }
+
+ test("from_json - input=array of single object, schema=struct, output=single row") {
+ val input = """[{"a": 1}]"""
+ val schema = StructType(StructField("a", IntegerType) :: Nil)
+ val output = InternalRow(1)
+ checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+ }
+
+ test("from_json - input=array, schema=struct, output=null") {
+ val input = """[{"a": 1}, {"a": 2}]"""
+ val schema = StructType(StructField("a", IntegerType) :: Nil)
+ val output = null
+ checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+ }
+
+ test("from_json - input=empty array, schema=struct, output=null") {
+ val input = """[]"""
+ val schema = StructType(StructField("a", IntegerType) :: Nil)
+ val output = null
+ checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+ }
+
+ test("from_json - input=empty object, schema=struct, output=single row with null") {
+ val input = """{ }"""
+ val schema = StructType(StructField("a", IntegerType) :: Nil)
+ val output = InternalRow(null)
+ checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+ }
+
test("from_json null input column") {
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(