diff options
author | hyukjinkwon <gurwls223@gmail.com> | 2017-03-19 22:33:01 -0700 |
---|---|---|
committer | Felix Cheung <felixcheung@apache.org> | 2017-03-19 22:33:01 -0700 |
commit | 0cdcf9114527a2c359c25e46fd6556b3855bfb28 (patch) | |
tree | b315a01420500d41669e9436658626f8890b7143 /sql/catalyst | |
parent | 990af630d0d569880edd9c7ce9932e10037a28ab (diff) | |
download | spark-0cdcf9114527a2c359c25e46fd6556b3855bfb28.tar.gz spark-0cdcf9114527a2c359c25e46fd6556b3855bfb28.tar.bz2 spark-0cdcf9114527a2c359c25e46fd6556b3855bfb28.zip |
[SPARK-19849][SQL] Support ArrayType in to_json to produce JSON array
## What changes were proposed in this pull request?
This PR proposes to support an array of struct type in `to_json` as below:
```scala
import org.apache.spark.sql.functions._
val df = Seq(Tuple1(Tuple1(1) :: Nil)).toDF("a")
df.select(to_json($"a").as("json")).show()
```
```
+----------+
| json|
+----------+
|[{"_1":1}]|
+----------+
```
Currently, it throws an exception as below (a newline manually inserted for readability):
```
org.apache.spark.sql.AnalysisException: cannot resolve 'structtojson(`array`)' due to data type
mismatch: structtojson requires that the expression is a struct expression.;;
```
This allows the roundtrip with `from_json` as below:
```scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
val df = Seq("""[{"a":1}, {"a":2}]""").toDF("json").select(from_json($"json", schema).as("array"))
df.show()
// Read back.
df.select(to_json($"array").as("json")).show()
```
```
+----------+
| array|
+----------+
|[[1], [2]]|
+----------+
+-----------------+
| json|
+-----------------+
|[{"a":1},{"a":2}]|
+-----------------+
```
Also, this PR proposes to rename from `StructToJson` to `StructsToJson ` and `JsonToStruct` to `JsonToStructs`.
## How was this patch tested?
Unit tests in `JsonFunctionsSuite` and `JsonExpressionsSuite` for Scala, doctest for Python and test in `test_sparkSQL.R` for R.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #17192 from HyukjinKwon/SPARK-19849.
Diffstat (limited to 'sql/catalyst')
4 files changed, 113 insertions, 61 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 0486e67dbd..e1d83a86f9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -425,8 +425,8 @@ object FunctionRegistry { expression[BitwiseXor]("^"), // json - expression[StructToJson]("to_json"), - expression[JsonToStruct]("from_json"), + expression[StructsToJson]("to_json"), + expression[JsonToStructs]("from_json"), // Cast aliases (SPARK-16730) castAlias("boolean", BooleanType), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 37e4bb5060..e4e08a8665 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.json._ -import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, ParseModes} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, ParseModes} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils @@ -482,7 +482,8 @@ case class JsonTuple(children: Seq[Expression]) } /** - * Converts an json input string to a [[StructType]] or [[ArrayType]] with the specified schema. + * Converts an json input string to a [[StructType]] or [[ArrayType]] of [[StructType]]s + * with the specified schema. */ // scalastyle:off line.size.limit @ExpressionDescription( @@ -495,7 +496,7 @@ case class JsonTuple(children: Seq[Expression]) {"time":"2015-08-26 00:00:00.0"} """) // scalastyle:on line.size.limit -case class JsonToStruct( +case class JsonToStructs( schema: DataType, options: Map[String, String], child: Expression, @@ -590,7 +591,7 @@ case class JsonToStruct( } /** - * Converts a [[StructType]] to a json output string. + * Converts a [[StructType]] or [[ArrayType]] of [[StructType]]s to a json output string. */ // scalastyle:off line.size.limit @ExpressionDescription( @@ -601,9 +602,11 @@ case class JsonToStruct( {"a":1,"b":2} > SELECT _FUNC_(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy')); {"time":"26/08/2015"} + > SELECT _FUNC_(array(named_struct('a', 1, 'b', 2)); + [{"a":1,"b":2}] """) // scalastyle:on line.size.limit -case class StructToJson( +case class StructsToJson( options: Map[String, String], child: Expression, timeZoneId: Option[String] = None) @@ -624,41 +627,58 @@ case class StructToJson( lazy val writer = new CharArrayWriter() @transient - lazy val gen = - new JacksonGenerator( - child.dataType.asInstanceOf[StructType], - writer, - new JSONOptions(options, timeZoneId.get)) + lazy val gen = new JacksonGenerator( + rowSchema, writer, new JSONOptions(options, timeZoneId.get)) + + @transient + lazy val rowSchema = child.dataType match { + case st: StructType => st + case ArrayType(st: StructType, _) => st + } + + // This converts rows to the JSON output according to the given schema. + @transient + lazy val converter: Any => UTF8String = { + def getAndReset(): UTF8String = { + gen.flush() + val json = writer.toString + writer.reset() + UTF8String.fromString(json) + } + + child.dataType match { + case _: StructType => + (row: Any) => + gen.write(row.asInstanceOf[InternalRow]) + getAndReset() + case ArrayType(_: StructType, _) => + (arr: Any) => + gen.write(arr.asInstanceOf[ArrayData]) + getAndReset() + } + } override def dataType: DataType = StringType - override def checkInputDataTypes(): TypeCheckResult = { - if (StructType.acceptsType(child.dataType)) { + override def checkInputDataTypes(): TypeCheckResult = child.dataType match { + case _: StructType | ArrayType(_: StructType, _) => try { - JacksonUtils.verifySchema(child.dataType.asInstanceOf[StructType]) + JacksonUtils.verifySchema(rowSchema) TypeCheckResult.TypeCheckSuccess } catch { case e: UnsupportedOperationException => TypeCheckResult.TypeCheckFailure(e.getMessage) } - } else { - TypeCheckResult.TypeCheckFailure( - s"$prettyName requires that the expression is a struct expression.") - } + case _ => TypeCheckResult.TypeCheckFailure( + s"Input type ${child.dataType.simpleString} must be a struct or array of structs.") } override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = copy(timeZoneId = Option(timeZoneId)) - override def nullSafeEval(row: Any): Any = { - gen.write(row.asInstanceOf[InternalRow]) - gen.flush() - val json = writer.toString - writer.reset() - UTF8String.fromString(json) - } + override def nullSafeEval(value: Any): Any = converter(value) - override def inputTypes: Seq[AbstractDataType] = StructType :: Nil + override def inputTypes: Seq[AbstractDataType] = TypeCollection(ArrayType, StructType) :: Nil } object JsonExprUtils { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index dec55279c9..1d302aea6f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -37,6 +37,10 @@ private[sql] class JacksonGenerator( // `ValueWriter`s for all fields of the schema private val rootFieldWriters: Array[ValueWriter] = schema.map(_.dataType).map(makeWriter).toArray + // `ValueWriter` for array data storing rows of the schema. + private val arrElementWriter: ValueWriter = (arr: SpecializedGetters, i: Int) => { + writeObject(writeFields(arr.getStruct(i, schema.length), schema, rootFieldWriters)) + } private val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null) @@ -185,17 +189,18 @@ private[sql] class JacksonGenerator( def flush(): Unit = gen.flush() /** - * Transforms a single InternalRow to JSON using Jackson + * Transforms a single `InternalRow` to JSON object using Jackson * * @param row The row to convert */ - def write(row: InternalRow): Unit = { - writeObject { - writeFields(row, schema, rootFieldWriters) - } - } + def write(row: InternalRow): Unit = writeObject(writeFields(row, schema, rootFieldWriters)) - def writeLineEnding(): Unit = { - gen.writeRaw('\n') - } + /** + * Transforms multiple `InternalRow`s to JSON array using Jackson + * + * @param array The array of rows to convert + */ + def write(array: ArrayData): Unit = writeArray(writeArrayData(array, arrElementWriter)) + + def writeLineEnding(): Unit = gen.writeRaw('\n') } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index 19d0c8eb92..e4698d4463 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -21,7 +21,7 @@ import java.util.Calendar import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, ParseModes} +import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, GenericArrayData, ParseModes} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -352,7 +352,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { val jsonData = """{"a": 1}""" val schema = StructType(StructField("a", IntegerType) :: Nil) checkEvaluation( - JsonToStruct(schema, Map.empty, Literal(jsonData), gmtId), + JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId), InternalRow(1) ) } @@ -361,13 +361,13 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { val jsonData = """{"a" 1}""" val schema = StructType(StructField("a", IntegerType) :: Nil) checkEvaluation( - JsonToStruct(schema, Map.empty, Literal(jsonData), gmtId), + JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId), null ) // Other modes should still return `null`. checkEvaluation( - JsonToStruct(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData), gmtId), + JsonToStructs(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData), gmtId), null ) } @@ -376,62 +376,62 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { val input = """[{"a": 1}, {"a": 2}]""" val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) val output = InternalRow(1) :: InternalRow(2) :: Nil - checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output) + checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output) } test("from_json - input=object, schema=array, output=array of single row") { val input = """{"a": 1}""" val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) val output = InternalRow(1) :: Nil - checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output) + checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output) } test("from_json - input=empty array, schema=array, output=empty array") { val input = "[ ]" val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) val output = Nil - checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output) + checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output) } test("from_json - input=empty object, schema=array, output=array of single row with null") { val input = "{ }" val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) val output = InternalRow(null) :: Nil - checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output) + checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output) } test("from_json - input=array of single object, schema=struct, output=single row") { val input = """[{"a": 1}]""" val schema = StructType(StructField("a", IntegerType) :: Nil) val output = InternalRow(1) - checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output) + checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output) } test("from_json - input=array, schema=struct, output=null") { val input = """[{"a": 1}, {"a": 2}]""" val schema = StructType(StructField("a", IntegerType) :: Nil) val output = null - checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output) + checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output) } test("from_json - input=empty array, schema=struct, output=null") { val input = """[]""" val schema = StructType(StructField("a", IntegerType) :: Nil) val output = null - checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output) + checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output) } test("from_json - input=empty object, schema=struct, output=single row with null") { val input = """{ }""" val schema = StructType(StructField("a", IntegerType) :: Nil) val output = InternalRow(null) - checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output) + checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output) } test("from_json null input column") { val schema = StructType(StructField("a", IntegerType) :: Nil) checkEvaluation( - JsonToStruct(schema, Map.empty, Literal.create(null, StringType), gmtId), + JsonToStructs(schema, Map.empty, Literal.create(null, StringType), gmtId), null ) } @@ -444,14 +444,14 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { c.set(2016, 0, 1, 0, 0, 0) c.set(Calendar.MILLISECOND, 123) checkEvaluation( - JsonToStruct(schema, Map.empty, Literal(jsonData1), gmtId), + JsonToStructs(schema, Map.empty, Literal(jsonData1), gmtId), InternalRow(c.getTimeInMillis * 1000L) ) // The result doesn't change because the json string includes timezone string ("Z" here), // which means the string represents the timestamp string in the timezone regardless of // the timeZoneId parameter. checkEvaluation( - JsonToStruct(schema, Map.empty, Literal(jsonData1), Option("PST")), + JsonToStructs(schema, Map.empty, Literal(jsonData1), Option("PST")), InternalRow(c.getTimeInMillis * 1000L) ) @@ -461,7 +461,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { c.set(2016, 0, 1, 0, 0, 0) c.set(Calendar.MILLISECOND, 0) checkEvaluation( - JsonToStruct( + JsonToStructs( schema, Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss"), Literal(jsonData2), @@ -469,7 +469,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { InternalRow(c.getTimeInMillis * 1000L) ) checkEvaluation( - JsonToStruct( + JsonToStructs( schema, Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", DateTimeUtils.TIMEZONE_OPTION -> tz.getID), @@ -483,25 +483,52 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { test("SPARK-19543: from_json empty input column") { val schema = StructType(StructField("a", IntegerType) :: Nil) checkEvaluation( - JsonToStruct(schema, Map.empty, Literal.create(" ", StringType), gmtId), + JsonToStructs(schema, Map.empty, Literal.create(" ", StringType), gmtId), null ) } - test("to_json") { + test("to_json - struct") { val schema = StructType(StructField("a", IntegerType) :: Nil) val struct = Literal.create(create_row(1), schema) checkEvaluation( - StructToJson(Map.empty, struct, gmtId), + StructsToJson(Map.empty, struct, gmtId), """{"a":1}""" ) } + test("to_json - array") { + val inputSchema = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) + val input = new GenericArrayData(InternalRow(1) :: InternalRow(2) :: Nil) + val output = """[{"a":1},{"a":2}]""" + checkEvaluation( + StructsToJson(Map.empty, Literal.create(input, inputSchema), gmtId), + output) + } + + test("to_json - array with single empty row") { + val inputSchema = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) + val input = new GenericArrayData(InternalRow(null) :: Nil) + val output = """[{}]""" + checkEvaluation( + StructsToJson(Map.empty, Literal.create(input, inputSchema), gmtId), + output) + } + + test("to_json - empty array") { + val inputSchema = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) + val input = new GenericArrayData(Nil) + val output = """[]""" + checkEvaluation( + StructsToJson(Map.empty, Literal.create(input, inputSchema), gmtId), + output) + } + test("to_json null input column") { val schema = StructType(StructField("a", IntegerType) :: Nil) val struct = Literal.create(null, schema) checkEvaluation( - StructToJson(Map.empty, struct, gmtId), + StructsToJson(Map.empty, struct, gmtId), null ) } @@ -514,16 +541,16 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { val struct = Literal.create(create_row(c.getTimeInMillis * 1000L), schema) checkEvaluation( - StructToJson(Map.empty, struct, gmtId), + StructsToJson(Map.empty, struct, gmtId), """{"t":"2016-01-01T00:00:00.000Z"}""" ) checkEvaluation( - StructToJson(Map.empty, struct, Option("PST")), + StructsToJson(Map.empty, struct, Option("PST")), """{"t":"2015-12-31T16:00:00.000-08:00"}""" ) checkEvaluation( - StructToJson( + StructsToJson( Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", DateTimeUtils.TIMEZONE_OPTION -> gmtId.get), struct, @@ -531,7 +558,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { """{"t":"2016-01-01T00:00:00"}""" ) checkEvaluation( - StructToJson( + StructsToJson( Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", DateTimeUtils.TIMEZONE_OPTION -> "PST"), struct, |