aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2017-03-19 22:33:01 -0700
committerFelix Cheung <felixcheung@apache.org>2017-03-19 22:33:01 -0700
commit0cdcf9114527a2c359c25e46fd6556b3855bfb28 (patch)
treeb315a01420500d41669e9436658626f8890b7143 /sql/catalyst
parent990af630d0d569880edd9c7ce9932e10037a28ab (diff)
downloadspark-0cdcf9114527a2c359c25e46fd6556b3855bfb28.tar.gz
spark-0cdcf9114527a2c359c25e46fd6556b3855bfb28.tar.bz2
spark-0cdcf9114527a2c359c25e46fd6556b3855bfb28.zip
[SPARK-19849][SQL] Support ArrayType in to_json to produce JSON array
## What changes were proposed in this pull request? This PR proposes to support an array of struct type in `to_json` as below: ```scala import org.apache.spark.sql.functions._ val df = Seq(Tuple1(Tuple1(1) :: Nil)).toDF("a") df.select(to_json($"a").as("json")).show() ``` ``` +----------+ | json| +----------+ |[{"_1":1}]| +----------+ ``` Currently, it throws an exception as below (a newline manually inserted for readability): ``` org.apache.spark.sql.AnalysisException: cannot resolve 'structtojson(`array`)' due to data type mismatch: structtojson requires that the expression is a struct expression.;; ``` This allows the roundtrip with `from_json` as below: ```scala import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) val df = Seq("""[{"a":1}, {"a":2}]""").toDF("json").select(from_json($"json", schema).as("array")) df.show() // Read back. df.select(to_json($"array").as("json")).show() ``` ``` +----------+ | array| +----------+ |[[1], [2]]| +----------+ +-----------------+ | json| +-----------------+ |[{"a":1},{"a":2}]| +-----------------+ ``` Also, this PR proposes to rename from `StructToJson` to `StructsToJson ` and `JsonToStruct` to `JsonToStructs`. ## How was this patch tested? Unit tests in `JsonFunctionsSuite` and `JsonExpressionsSuite` for Scala, doctest for Python and test in `test_sparkSQL.R` for R. Author: hyukjinkwon <gurwls223@gmail.com> Closes #17192 from HyukjinKwon/SPARK-19849.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala70
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala23
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala77
4 files changed, 113 insertions, 61 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 0486e67dbd..e1d83a86f9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -425,8 +425,8 @@ object FunctionRegistry {
expression[BitwiseXor]("^"),
// json
- expression[StructToJson]("to_json"),
- expression[JsonToStruct]("from_json"),
+ expression[StructsToJson]("to_json"),
+ expression[JsonToStructs]("from_json"),
// Cast aliases (SPARK-16730)
castAlias("boolean", BooleanType),
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 37e4bb5060..e4e08a8665 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json._
-import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, ParseModes}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, ParseModes}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
@@ -482,7 +482,8 @@ case class JsonTuple(children: Seq[Expression])
}
/**
- * Converts an json input string to a [[StructType]] or [[ArrayType]] with the specified schema.
+ * Converts an json input string to a [[StructType]] or [[ArrayType]] of [[StructType]]s
+ * with the specified schema.
*/
// scalastyle:off line.size.limit
@ExpressionDescription(
@@ -495,7 +496,7 @@ case class JsonTuple(children: Seq[Expression])
{"time":"2015-08-26 00:00:00.0"}
""")
// scalastyle:on line.size.limit
-case class JsonToStruct(
+case class JsonToStructs(
schema: DataType,
options: Map[String, String],
child: Expression,
@@ -590,7 +591,7 @@ case class JsonToStruct(
}
/**
- * Converts a [[StructType]] to a json output string.
+ * Converts a [[StructType]] or [[ArrayType]] of [[StructType]]s to a json output string.
*/
// scalastyle:off line.size.limit
@ExpressionDescription(
@@ -601,9 +602,11 @@ case class JsonToStruct(
{"a":1,"b":2}
> SELECT _FUNC_(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy'));
{"time":"26/08/2015"}
+ > SELECT _FUNC_(array(named_struct('a', 1, 'b', 2));
+ [{"a":1,"b":2}]
""")
// scalastyle:on line.size.limit
-case class StructToJson(
+case class StructsToJson(
options: Map[String, String],
child: Expression,
timeZoneId: Option[String] = None)
@@ -624,41 +627,58 @@ case class StructToJson(
lazy val writer = new CharArrayWriter()
@transient
- lazy val gen =
- new JacksonGenerator(
- child.dataType.asInstanceOf[StructType],
- writer,
- new JSONOptions(options, timeZoneId.get))
+ lazy val gen = new JacksonGenerator(
+ rowSchema, writer, new JSONOptions(options, timeZoneId.get))
+
+ @transient
+ lazy val rowSchema = child.dataType match {
+ case st: StructType => st
+ case ArrayType(st: StructType, _) => st
+ }
+
+ // This converts rows to the JSON output according to the given schema.
+ @transient
+ lazy val converter: Any => UTF8String = {
+ def getAndReset(): UTF8String = {
+ gen.flush()
+ val json = writer.toString
+ writer.reset()
+ UTF8String.fromString(json)
+ }
+
+ child.dataType match {
+ case _: StructType =>
+ (row: Any) =>
+ gen.write(row.asInstanceOf[InternalRow])
+ getAndReset()
+ case ArrayType(_: StructType, _) =>
+ (arr: Any) =>
+ gen.write(arr.asInstanceOf[ArrayData])
+ getAndReset()
+ }
+ }
override def dataType: DataType = StringType
- override def checkInputDataTypes(): TypeCheckResult = {
- if (StructType.acceptsType(child.dataType)) {
+ override def checkInputDataTypes(): TypeCheckResult = child.dataType match {
+ case _: StructType | ArrayType(_: StructType, _) =>
try {
- JacksonUtils.verifySchema(child.dataType.asInstanceOf[StructType])
+ JacksonUtils.verifySchema(rowSchema)
TypeCheckResult.TypeCheckSuccess
} catch {
case e: UnsupportedOperationException =>
TypeCheckResult.TypeCheckFailure(e.getMessage)
}
- } else {
- TypeCheckResult.TypeCheckFailure(
- s"$prettyName requires that the expression is a struct expression.")
- }
+ case _ => TypeCheckResult.TypeCheckFailure(
+ s"Input type ${child.dataType.simpleString} must be a struct or array of structs.")
}
override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Option(timeZoneId))
- override def nullSafeEval(row: Any): Any = {
- gen.write(row.asInstanceOf[InternalRow])
- gen.flush()
- val json = writer.toString
- writer.reset()
- UTF8String.fromString(json)
- }
+ override def nullSafeEval(value: Any): Any = converter(value)
- override def inputTypes: Seq[AbstractDataType] = StructType :: Nil
+ override def inputTypes: Seq[AbstractDataType] = TypeCollection(ArrayType, StructType) :: Nil
}
object JsonExprUtils {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
index dec55279c9..1d302aea6f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
@@ -37,6 +37,10 @@ private[sql] class JacksonGenerator(
// `ValueWriter`s for all fields of the schema
private val rootFieldWriters: Array[ValueWriter] = schema.map(_.dataType).map(makeWriter).toArray
+ // `ValueWriter` for array data storing rows of the schema.
+ private val arrElementWriter: ValueWriter = (arr: SpecializedGetters, i: Int) => {
+ writeObject(writeFields(arr.getStruct(i, schema.length), schema, rootFieldWriters))
+ }
private val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)
@@ -185,17 +189,18 @@ private[sql] class JacksonGenerator(
def flush(): Unit = gen.flush()
/**
- * Transforms a single InternalRow to JSON using Jackson
+ * Transforms a single `InternalRow` to JSON object using Jackson
*
* @param row The row to convert
*/
- def write(row: InternalRow): Unit = {
- writeObject {
- writeFields(row, schema, rootFieldWriters)
- }
- }
+ def write(row: InternalRow): Unit = writeObject(writeFields(row, schema, rootFieldWriters))
- def writeLineEnding(): Unit = {
- gen.writeRaw('\n')
- }
+ /**
+ * Transforms multiple `InternalRow`s to JSON array using Jackson
+ *
+ * @param array The array of rows to convert
+ */
+ def write(array: ArrayData): Unit = writeArray(writeArrayData(array, arrElementWriter))
+
+ def writeLineEnding(): Unit = gen.writeRaw('\n')
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index 19d0c8eb92..e4698d4463 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -21,7 +21,7 @@ import java.util.Calendar
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, ParseModes}
+import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, GenericArrayData, ParseModes}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -352,7 +352,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val jsonData = """{"a": 1}"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
- JsonToStruct(schema, Map.empty, Literal(jsonData), gmtId),
+ JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId),
InternalRow(1)
)
}
@@ -361,13 +361,13 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val jsonData = """{"a" 1}"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
- JsonToStruct(schema, Map.empty, Literal(jsonData), gmtId),
+ JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId),
null
)
// Other modes should still return `null`.
checkEvaluation(
- JsonToStruct(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData), gmtId),
+ JsonToStructs(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData), gmtId),
null
)
}
@@ -376,62 +376,62 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val input = """[{"a": 1}, {"a": 2}]"""
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
val output = InternalRow(1) :: InternalRow(2) :: Nil
- checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+ checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
}
test("from_json - input=object, schema=array, output=array of single row") {
val input = """{"a": 1}"""
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
val output = InternalRow(1) :: Nil
- checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+ checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
}
test("from_json - input=empty array, schema=array, output=empty array") {
val input = "[ ]"
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
val output = Nil
- checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+ checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
}
test("from_json - input=empty object, schema=array, output=array of single row with null") {
val input = "{ }"
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
val output = InternalRow(null) :: Nil
- checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+ checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
}
test("from_json - input=array of single object, schema=struct, output=single row") {
val input = """[{"a": 1}]"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
val output = InternalRow(1)
- checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+ checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
}
test("from_json - input=array, schema=struct, output=null") {
val input = """[{"a": 1}, {"a": 2}]"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
val output = null
- checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+ checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
}
test("from_json - input=empty array, schema=struct, output=null") {
val input = """[]"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
val output = null
- checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+ checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
}
test("from_json - input=empty object, schema=struct, output=single row with null") {
val input = """{ }"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
val output = InternalRow(null)
- checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+ checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
}
test("from_json null input column") {
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
- JsonToStruct(schema, Map.empty, Literal.create(null, StringType), gmtId),
+ JsonToStructs(schema, Map.empty, Literal.create(null, StringType), gmtId),
null
)
}
@@ -444,14 +444,14 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
c.set(2016, 0, 1, 0, 0, 0)
c.set(Calendar.MILLISECOND, 123)
checkEvaluation(
- JsonToStruct(schema, Map.empty, Literal(jsonData1), gmtId),
+ JsonToStructs(schema, Map.empty, Literal(jsonData1), gmtId),
InternalRow(c.getTimeInMillis * 1000L)
)
// The result doesn't change because the json string includes timezone string ("Z" here),
// which means the string represents the timestamp string in the timezone regardless of
// the timeZoneId parameter.
checkEvaluation(
- JsonToStruct(schema, Map.empty, Literal(jsonData1), Option("PST")),
+ JsonToStructs(schema, Map.empty, Literal(jsonData1), Option("PST")),
InternalRow(c.getTimeInMillis * 1000L)
)
@@ -461,7 +461,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
c.set(2016, 0, 1, 0, 0, 0)
c.set(Calendar.MILLISECOND, 0)
checkEvaluation(
- JsonToStruct(
+ JsonToStructs(
schema,
Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss"),
Literal(jsonData2),
@@ -469,7 +469,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
InternalRow(c.getTimeInMillis * 1000L)
)
checkEvaluation(
- JsonToStruct(
+ JsonToStructs(
schema,
Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss",
DateTimeUtils.TIMEZONE_OPTION -> tz.getID),
@@ -483,25 +483,52 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
test("SPARK-19543: from_json empty input column") {
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
- JsonToStruct(schema, Map.empty, Literal.create(" ", StringType), gmtId),
+ JsonToStructs(schema, Map.empty, Literal.create(" ", StringType), gmtId),
null
)
}
- test("to_json") {
+ test("to_json - struct") {
val schema = StructType(StructField("a", IntegerType) :: Nil)
val struct = Literal.create(create_row(1), schema)
checkEvaluation(
- StructToJson(Map.empty, struct, gmtId),
+ StructsToJson(Map.empty, struct, gmtId),
"""{"a":1}"""
)
}
+ test("to_json - array") {
+ val inputSchema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
+ val input = new GenericArrayData(InternalRow(1) :: InternalRow(2) :: Nil)
+ val output = """[{"a":1},{"a":2}]"""
+ checkEvaluation(
+ StructsToJson(Map.empty, Literal.create(input, inputSchema), gmtId),
+ output)
+ }
+
+ test("to_json - array with single empty row") {
+ val inputSchema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
+ val input = new GenericArrayData(InternalRow(null) :: Nil)
+ val output = """[{}]"""
+ checkEvaluation(
+ StructsToJson(Map.empty, Literal.create(input, inputSchema), gmtId),
+ output)
+ }
+
+ test("to_json - empty array") {
+ val inputSchema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
+ val input = new GenericArrayData(Nil)
+ val output = """[]"""
+ checkEvaluation(
+ StructsToJson(Map.empty, Literal.create(input, inputSchema), gmtId),
+ output)
+ }
+
test("to_json null input column") {
val schema = StructType(StructField("a", IntegerType) :: Nil)
val struct = Literal.create(null, schema)
checkEvaluation(
- StructToJson(Map.empty, struct, gmtId),
+ StructsToJson(Map.empty, struct, gmtId),
null
)
}
@@ -514,16 +541,16 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val struct = Literal.create(create_row(c.getTimeInMillis * 1000L), schema)
checkEvaluation(
- StructToJson(Map.empty, struct, gmtId),
+ StructsToJson(Map.empty, struct, gmtId),
"""{"t":"2016-01-01T00:00:00.000Z"}"""
)
checkEvaluation(
- StructToJson(Map.empty, struct, Option("PST")),
+ StructsToJson(Map.empty, struct, Option("PST")),
"""{"t":"2015-12-31T16:00:00.000-08:00"}"""
)
checkEvaluation(
- StructToJson(
+ StructsToJson(
Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss",
DateTimeUtils.TIMEZONE_OPTION -> gmtId.get),
struct,
@@ -531,7 +558,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
"""{"t":"2016-01-01T00:00:00"}"""
)
checkEvaluation(
- StructToJson(
+ StructsToJson(
Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss",
DateTimeUtils.TIMEZONE_OPTION -> "PST"),
struct,