aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2017-03-19 22:33:01 -0700
committerFelix Cheung <felixcheung@apache.org>2017-03-19 22:33:01 -0700
commit0cdcf9114527a2c359c25e46fd6556b3855bfb28 (patch)
treeb315a01420500d41669e9436658626f8890b7143
parent990af630d0d569880edd9c7ce9932e10037a28ab (diff)
downloadspark-0cdcf9114527a2c359c25e46fd6556b3855bfb28.tar.gz
spark-0cdcf9114527a2c359c25e46fd6556b3855bfb28.tar.bz2
spark-0cdcf9114527a2c359c25e46fd6556b3855bfb28.zip
[SPARK-19849][SQL] Support ArrayType in to_json to produce JSON array
## What changes were proposed in this pull request? This PR proposes to support an array of struct type in `to_json` as below: ```scala import org.apache.spark.sql.functions._ val df = Seq(Tuple1(Tuple1(1) :: Nil)).toDF("a") df.select(to_json($"a").as("json")).show() ``` ``` +----------+ | json| +----------+ |[{"_1":1}]| +----------+ ``` Currently, it throws an exception as below (a newline manually inserted for readability): ``` org.apache.spark.sql.AnalysisException: cannot resolve 'structtojson(`array`)' due to data type mismatch: structtojson requires that the expression is a struct expression.;; ``` This allows the roundtrip with `from_json` as below: ```scala import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) val df = Seq("""[{"a":1}, {"a":2}]""").toDF("json").select(from_json($"json", schema).as("array")) df.show() // Read back. df.select(to_json($"array").as("json")).show() ``` ``` +----------+ | array| +----------+ |[[1], [2]]| +----------+ +-----------------+ | json| +-----------------+ |[{"a":1},{"a":2}]| +-----------------+ ``` Also, this PR proposes to rename from `StructToJson` to `StructsToJson ` and `JsonToStruct` to `JsonToStructs`. ## How was this patch tested? Unit tests in `JsonFunctionsSuite` and `JsonExpressionsSuite` for Scala, doctest for Python and test in `test_sparkSQL.R` for R. Author: hyukjinkwon <gurwls223@gmail.com> Closes #17192 from HyukjinKwon/SPARK-19849.
-rw-r--r--R/pkg/R/functions.R18
-rw-r--r--R/pkg/inst/tests/testthat/test_sparkSQL.R4
-rw-r--r--python/pyspark/sql/functions.py15
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala70
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala23
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala77
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/functions.scala34
-rw-r--r--sql/core/src/test/resources/sql-tests/inputs/json-functions.sql1
-rw-r--r--sql/core/src/test/resources/sql-tests/results/json-functions.sql.out96
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala26
11 files changed, 236 insertions, 132 deletions
diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index 9867f2d5b7..2cff3ac08c 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -1795,10 +1795,10 @@ setMethod("to_date",
#' to_json
#'
-#' Converts a column containing a \code{structType} into a Column of JSON string.
-#' Resolving the Column can fail if an unsupported type is encountered.
+#' Converts a column containing a \code{structType} or array of \code{structType} into a Column
+#' of JSON string. Resolving the Column can fail if an unsupported type is encountered.
#'
-#' @param x Column containing the struct
+#' @param x Column containing the struct or array of the structs
#' @param ... additional named properties to control how it is converted, accepts the same options
#' as the JSON data source.
#'
@@ -1809,8 +1809,13 @@ setMethod("to_date",
#' @export
#' @examples
#' \dontrun{
-#' to_json(df$t, dateFormat = 'dd/MM/yyyy')
-#' select(df, to_json(df$t))
+#' # Converts a struct into a JSON object
+#' df <- sql("SELECT named_struct('date', cast('2000-01-01' as date)) as d")
+#' select(df, to_json(df$d, dateFormat = 'dd/MM/yyyy'))
+#'
+#' # Converts an array of structs into a JSON array
+#' df <- sql("SELECT array(named_struct('name', 'Bob'), named_struct('name', 'Alice')) as people")
+#' select(df, to_json(df$people))
#'}
#' @note to_json since 2.2.0
setMethod("to_json", signature(x = "Column"),
@@ -2433,7 +2438,8 @@ setMethod("date_format", signature(y = "Column", x = "character"),
#' from_json
#'
#' Parses a column containing a JSON string into a Column of \code{structType} with the specified
-#' \code{schema}. If the string is unparseable, the Column will contains the value NA.
+#' \code{schema} or array of \code{structType} if \code{asJsonArray} is set to \code{TRUE}.
+#' If the string is unparseable, the Column will contains the value NA.
#'
#' @param x Column containing the JSON string.
#' @param schema a structType object to use as the schema to use when parsing the JSON string.
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 32856b399c..9c38e0d866 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -1340,6 +1340,10 @@ test_that("column functions", {
expect_equal(collect(select(df, bround(df$x, 0)))[[1]][2], 4)
# Test to_json(), from_json()
+ df <- sql("SELECT array(named_struct('name', 'Bob'), named_struct('name', 'Alice')) as people")
+ j <- collect(select(df, alias(to_json(df$people), "json")))
+ expect_equal(j[order(j$json), ][1], "[{\"name\":\"Bob\"},{\"name\":\"Alice\"}]")
+
df <- read.json(mapTypeJsonPath)
j <- collect(select(df, alias(to_json(df$info), "json")))
expect_equal(j[order(j$json), ][1], "{\"age\":16,\"height\":176.5}")
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 376b86ea69..f9121e60f3 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1774,10 +1774,11 @@ def json_tuple(col, *fields):
def from_json(col, schema, options={}):
"""
Parses a column containing a JSON string into a [[StructType]] or [[ArrayType]]
- with the specified schema. Returns `null`, in the case of an unparseable string.
+ of [[StructType]]s with the specified schema. Returns `null`, in the case of an unparseable
+ string.
:param col: string column in json format
- :param schema: a StructType or ArrayType to use when parsing the json column
+ :param schema: a StructType or ArrayType of StructType to use when parsing the json column
:param options: options to control parsing. accepts the same options as the json datasource
>>> from pyspark.sql.types import *
@@ -1802,10 +1803,10 @@ def from_json(col, schema, options={}):
@since(2.1)
def to_json(col, options={}):
"""
- Converts a column containing a [[StructType]] into a JSON string. Throws an exception,
- in the case of an unsupported type.
+ Converts a column containing a [[StructType]] or [[ArrayType]] of [[StructType]]s into a
+ JSON string. Throws an exception, in the case of an unsupported type.
- :param col: name of column containing the struct
+ :param col: name of column containing the struct or array of the structs
:param options: options to control converting. accepts the same options as the json datasource
>>> from pyspark.sql import Row
@@ -1814,6 +1815,10 @@ def to_json(col, options={}):
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(to_json(df.value).alias("json")).collect()
[Row(json=u'{"age":2,"name":"Alice"}')]
+ >>> data = [(1, [Row(name='Alice', age=2), Row(name='Bob', age=3)])]
+ >>> df = spark.createDataFrame(data, ("key", "value"))
+ >>> df.select(to_json(df.value).alias("json")).collect()
+ [Row(json=u'[{"age":2,"name":"Alice"},{"age":3,"name":"Bob"}]')]
"""
sc = SparkContext._active_spark_context
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 0486e67dbd..e1d83a86f9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -425,8 +425,8 @@ object FunctionRegistry {
expression[BitwiseXor]("^"),
// json
- expression[StructToJson]("to_json"),
- expression[JsonToStruct]("from_json"),
+ expression[StructsToJson]("to_json"),
+ expression[JsonToStructs]("from_json"),
// Cast aliases (SPARK-16730)
castAlias("boolean", BooleanType),
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 37e4bb5060..e4e08a8665 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json._
-import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, ParseModes}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, ParseModes}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
@@ -482,7 +482,8 @@ case class JsonTuple(children: Seq[Expression])
}
/**
- * Converts an json input string to a [[StructType]] or [[ArrayType]] with the specified schema.
+ * Converts an json input string to a [[StructType]] or [[ArrayType]] of [[StructType]]s
+ * with the specified schema.
*/
// scalastyle:off line.size.limit
@ExpressionDescription(
@@ -495,7 +496,7 @@ case class JsonTuple(children: Seq[Expression])
{"time":"2015-08-26 00:00:00.0"}
""")
// scalastyle:on line.size.limit
-case class JsonToStruct(
+case class JsonToStructs(
schema: DataType,
options: Map[String, String],
child: Expression,
@@ -590,7 +591,7 @@ case class JsonToStruct(
}
/**
- * Converts a [[StructType]] to a json output string.
+ * Converts a [[StructType]] or [[ArrayType]] of [[StructType]]s to a json output string.
*/
// scalastyle:off line.size.limit
@ExpressionDescription(
@@ -601,9 +602,11 @@ case class JsonToStruct(
{"a":1,"b":2}
> SELECT _FUNC_(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy'));
{"time":"26/08/2015"}
+ > SELECT _FUNC_(array(named_struct('a', 1, 'b', 2));
+ [{"a":1,"b":2}]
""")
// scalastyle:on line.size.limit
-case class StructToJson(
+case class StructsToJson(
options: Map[String, String],
child: Expression,
timeZoneId: Option[String] = None)
@@ -624,41 +627,58 @@ case class StructToJson(
lazy val writer = new CharArrayWriter()
@transient
- lazy val gen =
- new JacksonGenerator(
- child.dataType.asInstanceOf[StructType],
- writer,
- new JSONOptions(options, timeZoneId.get))
+ lazy val gen = new JacksonGenerator(
+ rowSchema, writer, new JSONOptions(options, timeZoneId.get))
+
+ @transient
+ lazy val rowSchema = child.dataType match {
+ case st: StructType => st
+ case ArrayType(st: StructType, _) => st
+ }
+
+ // This converts rows to the JSON output according to the given schema.
+ @transient
+ lazy val converter: Any => UTF8String = {
+ def getAndReset(): UTF8String = {
+ gen.flush()
+ val json = writer.toString
+ writer.reset()
+ UTF8String.fromString(json)
+ }
+
+ child.dataType match {
+ case _: StructType =>
+ (row: Any) =>
+ gen.write(row.asInstanceOf[InternalRow])
+ getAndReset()
+ case ArrayType(_: StructType, _) =>
+ (arr: Any) =>
+ gen.write(arr.asInstanceOf[ArrayData])
+ getAndReset()
+ }
+ }
override def dataType: DataType = StringType
- override def checkInputDataTypes(): TypeCheckResult = {
- if (StructType.acceptsType(child.dataType)) {
+ override def checkInputDataTypes(): TypeCheckResult = child.dataType match {
+ case _: StructType | ArrayType(_: StructType, _) =>
try {
- JacksonUtils.verifySchema(child.dataType.asInstanceOf[StructType])
+ JacksonUtils.verifySchema(rowSchema)
TypeCheckResult.TypeCheckSuccess
} catch {
case e: UnsupportedOperationException =>
TypeCheckResult.TypeCheckFailure(e.getMessage)
}
- } else {
- TypeCheckResult.TypeCheckFailure(
- s"$prettyName requires that the expression is a struct expression.")
- }
+ case _ => TypeCheckResult.TypeCheckFailure(
+ s"Input type ${child.dataType.simpleString} must be a struct or array of structs.")
}
override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Option(timeZoneId))
- override def nullSafeEval(row: Any): Any = {
- gen.write(row.asInstanceOf[InternalRow])
- gen.flush()
- val json = writer.toString
- writer.reset()
- UTF8String.fromString(json)
- }
+ override def nullSafeEval(value: Any): Any = converter(value)
- override def inputTypes: Seq[AbstractDataType] = StructType :: Nil
+ override def inputTypes: Seq[AbstractDataType] = TypeCollection(ArrayType, StructType) :: Nil
}
object JsonExprUtils {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
index dec55279c9..1d302aea6f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
@@ -37,6 +37,10 @@ private[sql] class JacksonGenerator(
// `ValueWriter`s for all fields of the schema
private val rootFieldWriters: Array[ValueWriter] = schema.map(_.dataType).map(makeWriter).toArray
+ // `ValueWriter` for array data storing rows of the schema.
+ private val arrElementWriter: ValueWriter = (arr: SpecializedGetters, i: Int) => {
+ writeObject(writeFields(arr.getStruct(i, schema.length), schema, rootFieldWriters))
+ }
private val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)
@@ -185,17 +189,18 @@ private[sql] class JacksonGenerator(
def flush(): Unit = gen.flush()
/**
- * Transforms a single InternalRow to JSON using Jackson
+ * Transforms a single `InternalRow` to JSON object using Jackson
*
* @param row The row to convert
*/
- def write(row: InternalRow): Unit = {
- writeObject {
- writeFields(row, schema, rootFieldWriters)
- }
- }
+ def write(row: InternalRow): Unit = writeObject(writeFields(row, schema, rootFieldWriters))
- def writeLineEnding(): Unit = {
- gen.writeRaw('\n')
- }
+ /**
+ * Transforms multiple `InternalRow`s to JSON array using Jackson
+ *
+ * @param array The array of rows to convert
+ */
+ def write(array: ArrayData): Unit = writeArray(writeArrayData(array, arrElementWriter))
+
+ def writeLineEnding(): Unit = gen.writeRaw('\n')
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index 19d0c8eb92..e4698d4463 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -21,7 +21,7 @@ import java.util.Calendar
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, ParseModes}
+import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, GenericArrayData, ParseModes}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -352,7 +352,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val jsonData = """{"a": 1}"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
- JsonToStruct(schema, Map.empty, Literal(jsonData), gmtId),
+ JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId),
InternalRow(1)
)
}
@@ -361,13 +361,13 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val jsonData = """{"a" 1}"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
- JsonToStruct(schema, Map.empty, Literal(jsonData), gmtId),
+ JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId),
null
)
// Other modes should still return `null`.
checkEvaluation(
- JsonToStruct(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData), gmtId),
+ JsonToStructs(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData), gmtId),
null
)
}
@@ -376,62 +376,62 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val input = """[{"a": 1}, {"a": 2}]"""
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
val output = InternalRow(1) :: InternalRow(2) :: Nil
- checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+ checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
}
test("from_json - input=object, schema=array, output=array of single row") {
val input = """{"a": 1}"""
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
val output = InternalRow(1) :: Nil
- checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+ checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
}
test("from_json - input=empty array, schema=array, output=empty array") {
val input = "[ ]"
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
val output = Nil
- checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+ checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
}
test("from_json - input=empty object, schema=array, output=array of single row with null") {
val input = "{ }"
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
val output = InternalRow(null) :: Nil
- checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+ checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
}
test("from_json - input=array of single object, schema=struct, output=single row") {
val input = """[{"a": 1}]"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
val output = InternalRow(1)
- checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+ checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
}
test("from_json - input=array, schema=struct, output=null") {
val input = """[{"a": 1}, {"a": 2}]"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
val output = null
- checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+ checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
}
test("from_json - input=empty array, schema=struct, output=null") {
val input = """[]"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
val output = null
- checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+ checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
}
test("from_json - input=empty object, schema=struct, output=single row with null") {
val input = """{ }"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
val output = InternalRow(null)
- checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+ checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
}
test("from_json null input column") {
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
- JsonToStruct(schema, Map.empty, Literal.create(null, StringType), gmtId),
+ JsonToStructs(schema, Map.empty, Literal.create(null, StringType), gmtId),
null
)
}
@@ -444,14 +444,14 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
c.set(2016, 0, 1, 0, 0, 0)
c.set(Calendar.MILLISECOND, 123)
checkEvaluation(
- JsonToStruct(schema, Map.empty, Literal(jsonData1), gmtId),
+ JsonToStructs(schema, Map.empty, Literal(jsonData1), gmtId),
InternalRow(c.getTimeInMillis * 1000L)
)
// The result doesn't change because the json string includes timezone string ("Z" here),
// which means the string represents the timestamp string in the timezone regardless of
// the timeZoneId parameter.
checkEvaluation(
- JsonToStruct(schema, Map.empty, Literal(jsonData1), Option("PST")),
+ JsonToStructs(schema, Map.empty, Literal(jsonData1), Option("PST")),
InternalRow(c.getTimeInMillis * 1000L)
)
@@ -461,7 +461,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
c.set(2016, 0, 1, 0, 0, 0)
c.set(Calendar.MILLISECOND, 0)
checkEvaluation(
- JsonToStruct(
+ JsonToStructs(
schema,
Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss"),
Literal(jsonData2),
@@ -469,7 +469,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
InternalRow(c.getTimeInMillis * 1000L)
)
checkEvaluation(
- JsonToStruct(
+ JsonToStructs(
schema,
Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss",
DateTimeUtils.TIMEZONE_OPTION -> tz.getID),
@@ -483,25 +483,52 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
test("SPARK-19543: from_json empty input column") {
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
- JsonToStruct(schema, Map.empty, Literal.create(" ", StringType), gmtId),
+ JsonToStructs(schema, Map.empty, Literal.create(" ", StringType), gmtId),
null
)
}
- test("to_json") {
+ test("to_json - struct") {
val schema = StructType(StructField("a", IntegerType) :: Nil)
val struct = Literal.create(create_row(1), schema)
checkEvaluation(
- StructToJson(Map.empty, struct, gmtId),
+ StructsToJson(Map.empty, struct, gmtId),
"""{"a":1}"""
)
}
+ test("to_json - array") {
+ val inputSchema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
+ val input = new GenericArrayData(InternalRow(1) :: InternalRow(2) :: Nil)
+ val output = """[{"a":1},{"a":2}]"""
+ checkEvaluation(
+ StructsToJson(Map.empty, Literal.create(input, inputSchema), gmtId),
+ output)
+ }
+
+ test("to_json - array with single empty row") {
+ val inputSchema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
+ val input = new GenericArrayData(InternalRow(null) :: Nil)
+ val output = """[{}]"""
+ checkEvaluation(
+ StructsToJson(Map.empty, Literal.create(input, inputSchema), gmtId),
+ output)
+ }
+
+ test("to_json - empty array") {
+ val inputSchema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
+ val input = new GenericArrayData(Nil)
+ val output = """[]"""
+ checkEvaluation(
+ StructsToJson(Map.empty, Literal.create(input, inputSchema), gmtId),
+ output)
+ }
+
test("to_json null input column") {
val schema = StructType(StructField("a", IntegerType) :: Nil)
val struct = Literal.create(null, schema)
checkEvaluation(
- StructToJson(Map.empty, struct, gmtId),
+ StructsToJson(Map.empty, struct, gmtId),
null
)
}
@@ -514,16 +541,16 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val struct = Literal.create(create_row(c.getTimeInMillis * 1000L), schema)
checkEvaluation(
- StructToJson(Map.empty, struct, gmtId),
+ StructsToJson(Map.empty, struct, gmtId),
"""{"t":"2016-01-01T00:00:00.000Z"}"""
)
checkEvaluation(
- StructToJson(Map.empty, struct, Option("PST")),
+ StructsToJson(Map.empty, struct, Option("PST")),
"""{"t":"2015-12-31T16:00:00.000-08:00"}"""
)
checkEvaluation(
- StructToJson(
+ StructsToJson(
Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss",
DateTimeUtils.TIMEZONE_OPTION -> gmtId.get),
struct,
@@ -531,7 +558,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
"""{"t":"2016-01-01T00:00:00"}"""
)
checkEvaluation(
- StructToJson(
+ StructsToJson(
Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss",
DateTimeUtils.TIMEZONE_OPTION -> "PST"),
struct,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 201f726db3..a9f089c850 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -2978,7 +2978,8 @@ object functions {
/**
* (Scala-specific) Parses a column containing a JSON string into a `StructType` or `ArrayType`
- * with the specified schema. Returns `null`, in the case of an unparseable string.
+ * of `StructType`s with the specified schema. Returns `null`, in the case of an unparseable
+ * string.
*
* @param e a string column containing JSON data.
* @param schema the schema to use when parsing the json string
@@ -2989,7 +2990,7 @@ object functions {
* @since 2.2.0
*/
def from_json(e: Column, schema: DataType, options: Map[String, String]): Column = withExpr {
- JsonToStruct(schema, options, e.expr)
+ JsonToStructs(schema, options, e.expr)
}
/**
@@ -3009,7 +3010,8 @@ object functions {
/**
* (Java-specific) Parses a column containing a JSON string into a `StructType` or `ArrayType`
- * with the specified schema. Returns `null`, in the case of an unparseable string.
+ * of `StructType`s with the specified schema. Returns `null`, in the case of an unparseable
+ * string.
*
* @param e a string column containing JSON data.
* @param schema the schema to use when parsing the json string
@@ -3036,7 +3038,7 @@ object functions {
from_json(e, schema, Map.empty[String, String])
/**
- * Parses a column containing a JSON string into a `StructType` or `ArrayType`
+ * Parses a column containing a JSON string into a `StructType` or `ArrayType` of `StructType`s
* with the specified schema. Returns `null`, in the case of an unparseable string.
*
* @param e a string column containing JSON data.
@@ -3049,7 +3051,7 @@ object functions {
from_json(e, schema, Map.empty[String, String])
/**
- * Parses a column containing a JSON string into a `StructType` or `ArrayType`
+ * Parses a column containing a JSON string into a `StructType` or `ArrayType` of `StructType`s
* with the specified schema. Returns `null`, in the case of an unparseable string.
*
* @param e a string column containing JSON data.
@@ -3062,10 +3064,11 @@ object functions {
from_json(e, DataType.fromJson(schema), options)
/**
- * (Scala-specific) Converts a column containing a `StructType` into a JSON string with the
- * specified schema. Throws an exception, in the case of an unsupported type.
+ * (Scala-specific) Converts a column containing a `StructType` or `ArrayType` of `StructType`s
+ * into a JSON string with the specified schema. Throws an exception, in the case of an
+ * unsupported type.
*
- * @param e a struct column.
+ * @param e a column containing a struct or array of the structs.
* @param options options to control how the struct column is converted into a json string.
* accepts the same options and the json data source.
*
@@ -3073,14 +3076,15 @@ object functions {
* @since 2.1.0
*/
def to_json(e: Column, options: Map[String, String]): Column = withExpr {
- StructToJson(options, e.expr)
+ StructsToJson(options, e.expr)
}
/**
- * (Java-specific) Converts a column containing a `StructType` into a JSON string with the
- * specified schema. Throws an exception, in the case of an unsupported type.
+ * (Java-specific) Converts a column containing a `StructType` or `ArrayType` of `StructType`s
+ * into a JSON string with the specified schema. Throws an exception, in the case of an
+ * unsupported type.
*
- * @param e a struct column.
+ * @param e a column containing a struct or array of the structs.
* @param options options to control how the struct column is converted into a json string.
* accepts the same options and the json data source.
*
@@ -3091,10 +3095,10 @@ object functions {
to_json(e, options.asScala.toMap)
/**
- * Converts a column containing a `StructType` into a JSON string with the
- * specified schema. Throws an exception, in the case of an unsupported type.
+ * Converts a column containing a `StructType` or `ArrayType` of `StructType`s into a JSON string
+ * with the specified schema. Throws an exception, in the case of an unsupported type.
*
- * @param e a struct column.
+ * @param e a column containing a struct or array of the structs.
*
* @group collection_funcs
* @since 2.1.0
diff --git a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql
index 83243c5e5a..b3cc2cea51 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql
@@ -3,6 +3,7 @@ describe function to_json;
describe function extended to_json;
select to_json(named_struct('a', 1, 'b', 2));
select to_json(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy'));
+select to_json(array(named_struct('a', 1, 'b', 2)));
-- Check if errors handled
select to_json(named_struct('a', 1, 'b', 2), named_struct('mode', 'PERMISSIVE'));
select to_json(named_struct('a', 1, 'b', 2), map('mode', 1));
diff --git a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
index b57cbbc1d8..315e1730ce 100644
--- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
--- Number of queries: 16
+-- Number of queries: 17
-- !query 0
@@ -7,7 +7,7 @@ describe function to_json
-- !query 0 schema
struct<function_desc:string>
-- !query 0 output
-Class: org.apache.spark.sql.catalyst.expressions.StructToJson
+Class: org.apache.spark.sql.catalyst.expressions.StructsToJson
Function: to_json
Usage: to_json(expr[, options]) - Returns a json string with a given struct value
@@ -17,13 +17,15 @@ describe function extended to_json
-- !query 1 schema
struct<function_desc:string>
-- !query 1 output
-Class: org.apache.spark.sql.catalyst.expressions.StructToJson
+Class: org.apache.spark.sql.catalyst.expressions.StructsToJson
Extended Usage:
Examples:
> SELECT to_json(named_struct('a', 1, 'b', 2));
{"a":1,"b":2}
> SELECT to_json(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy'));
{"time":"26/08/2015"}
+ > SELECT to_json(array(named_struct('a', 1, 'b', 2));
+ [{"a":1,"b":2}]
Function: to_json
Usage: to_json(expr[, options]) - Returns a json string with a given struct value
@@ -32,7 +34,7 @@ Usage: to_json(expr[, options]) - Returns a json string with a given struct valu
-- !query 2
select to_json(named_struct('a', 1, 'b', 2))
-- !query 2 schema
-struct<structtojson(named_struct(a, 1, b, 2)):string>
+struct<structstojson(named_struct(a, 1, b, 2)):string>
-- !query 2 output
{"a":1,"b":2}
@@ -40,54 +42,62 @@ struct<structtojson(named_struct(a, 1, b, 2)):string>
-- !query 3
select to_json(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy'))
-- !query 3 schema
-struct<structtojson(named_struct(time, to_timestamp('2015-08-26', 'yyyy-MM-dd'))):string>
+struct<structstojson(named_struct(time, to_timestamp('2015-08-26', 'yyyy-MM-dd'))):string>
-- !query 3 output
{"time":"26/08/2015"}
-- !query 4
-select to_json(named_struct('a', 1, 'b', 2), named_struct('mode', 'PERMISSIVE'))
+select to_json(array(named_struct('a', 1, 'b', 2)))
-- !query 4 schema
-struct<>
+struct<structstojson(array(named_struct(a, 1, b, 2))):string>
-- !query 4 output
-org.apache.spark.sql.AnalysisException
-Must use a map() function for options;; line 1 pos 7
+[{"a":1,"b":2}]
-- !query 5
-select to_json(named_struct('a', 1, 'b', 2), map('mode', 1))
+select to_json(named_struct('a', 1, 'b', 2), named_struct('mode', 'PERMISSIVE'))
-- !query 5 schema
struct<>
-- !query 5 output
org.apache.spark.sql.AnalysisException
-A type of keys and values in map() must be string, but got MapType(StringType,IntegerType,false);; line 1 pos 7
+Must use a map() function for options;; line 1 pos 7
-- !query 6
-select to_json()
+select to_json(named_struct('a', 1, 'b', 2), map('mode', 1))
-- !query 6 schema
struct<>
-- !query 6 output
org.apache.spark.sql.AnalysisException
-Invalid number of arguments for function to_json; line 1 pos 7
+A type of keys and values in map() must be string, but got MapType(StringType,IntegerType,false);; line 1 pos 7
-- !query 7
-describe function from_json
+select to_json()
-- !query 7 schema
-struct<function_desc:string>
+struct<>
-- !query 7 output
-Class: org.apache.spark.sql.catalyst.expressions.JsonToStruct
-Function: from_json
-Usage: from_json(jsonStr, schema[, options]) - Returns a struct value with the given `jsonStr` and `schema`.
+org.apache.spark.sql.AnalysisException
+Invalid number of arguments for function to_json; line 1 pos 7
-- !query 8
-describe function extended from_json
+describe function from_json
-- !query 8 schema
struct<function_desc:string>
-- !query 8 output
-Class: org.apache.spark.sql.catalyst.expressions.JsonToStruct
+Class: org.apache.spark.sql.catalyst.expressions.JsonToStructs
+Function: from_json
+Usage: from_json(jsonStr, schema[, options]) - Returns a struct value with the given `jsonStr` and `schema`.
+
+
+-- !query 9
+describe function extended from_json
+-- !query 9 schema
+struct<function_desc:string>
+-- !query 9 output
+Class: org.apache.spark.sql.catalyst.expressions.JsonToStructs
Extended Usage:
Examples:
> SELECT from_json('{"a":1, "b":0.8}', 'a INT, b DOUBLE');
@@ -99,36 +109,36 @@ Function: from_json
Usage: from_json(jsonStr, schema[, options]) - Returns a struct value with the given `jsonStr` and `schema`.
--- !query 9
+-- !query 10
select from_json('{"a":1}', 'a INT')
--- !query 9 schema
-struct<jsontostruct({"a":1}):struct<a:int>>
--- !query 9 output
+-- !query 10 schema
+struct<jsontostructs({"a":1}):struct<a:int>>
+-- !query 10 output
{"a":1}
--- !query 10
+-- !query 11
select from_json('{"time":"26/08/2015"}', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy'))
--- !query 10 schema
-struct<jsontostruct({"time":"26/08/2015"}):struct<time:timestamp>>
--- !query 10 output
+-- !query 11 schema
+struct<jsontostructs({"time":"26/08/2015"}):struct<time:timestamp>>
+-- !query 11 output
{"time":2015-08-26 00:00:00.0}
--- !query 11
+-- !query 12
select from_json('{"a":1}', 1)
--- !query 11 schema
+-- !query 12 schema
struct<>
--- !query 11 output
+-- !query 12 output
org.apache.spark.sql.AnalysisException
Expected a string literal instead of 1;; line 1 pos 7
--- !query 12
+-- !query 13
select from_json('{"a":1}', 'a InvalidType')
--- !query 12 schema
+-- !query 13 schema
struct<>
--- !query 12 output
+-- !query 13 output
org.apache.spark.sql.AnalysisException
DataType invalidtype() is not supported.(line 1, pos 2)
@@ -139,28 +149,28 @@ a InvalidType
; line 1 pos 7
--- !query 13
+-- !query 14
select from_json('{"a":1}', 'a INT', named_struct('mode', 'PERMISSIVE'))
--- !query 13 schema
+-- !query 14 schema
struct<>
--- !query 13 output
+-- !query 14 output
org.apache.spark.sql.AnalysisException
Must use a map() function for options;; line 1 pos 7
--- !query 14
+-- !query 15
select from_json('{"a":1}', 'a INT', map('mode', 1))
--- !query 14 schema
+-- !query 15 schema
struct<>
--- !query 14 output
+-- !query 15 output
org.apache.spark.sql.AnalysisException
A type of keys and values in map() must be string, but got MapType(StringType,IntegerType,false);; line 1 pos 7
--- !query 15
+-- !query 16
select from_json()
--- !query 15 schema
+-- !query 16 schema
struct<>
--- !query 15 output
+-- !query 16 output
org.apache.spark.sql.AnalysisException
Invalid number of arguments for function from_json; line 1 pos 7
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index 2345b82081..170c238c53 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -156,7 +156,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
Row(Seq(Row(1, "a"), Row(2, null), Row(null, null))))
}
- test("to_json") {
+ test("to_json - struct") {
val df = Seq(Tuple1(Tuple1(1))).toDF("a")
checkAnswer(
@@ -164,6 +164,14 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
Row("""{"_1":1}""") :: Nil)
}
+ test("to_json - array") {
+ val df = Seq(Tuple1(Tuple1(1) :: Nil)).toDF("a")
+
+ checkAnswer(
+ df.select(to_json($"a")),
+ Row("""[{"_1":1}]""") :: Nil)
+ }
+
test("to_json with option") {
val df = Seq(Tuple1(Tuple1(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0")))).toDF("a")
val options = Map("timestampFormat" -> "dd/MM/yyyy HH:mm")
@@ -184,7 +192,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
"Unable to convert column a of type calendarinterval to JSON."))
}
- test("roundtrip in to_json and from_json") {
+ test("roundtrip in to_json and from_json - struct") {
val dfOne = Seq(Tuple1(Tuple1(1)), Tuple1(null)).toDF("struct")
val schemaOne = dfOne.schema(0).dataType.asInstanceOf[StructType]
val readBackOne = dfOne.select(to_json($"struct").as("json"))
@@ -198,6 +206,20 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
checkAnswer(dfTwo, readBackTwo)
}
+ test("roundtrip in to_json and from_json - array") {
+ val dfOne = Seq(Tuple1(Tuple1(1) :: Nil), Tuple1(null :: Nil)).toDF("array")
+ val schemaOne = dfOne.schema(0).dataType
+ val readBackOne = dfOne.select(to_json($"array").as("json"))
+ .select(from_json($"json", schemaOne).as("array"))
+ checkAnswer(dfOne, readBackOne)
+
+ val dfTwo = Seq(Some("""[{"a":1}]"""), None).toDF("json")
+ val schemaTwo = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
+ val readBackTwo = dfTwo.select(from_json($"json", schemaTwo).as("array"))
+ .select(to_json($"array").as("json"))
+ checkAnswer(dfTwo, readBackTwo)
+ }
+
test("SPARK-19637 Support to_json in SQL") {
val df1 = Seq(Tuple1(Tuple1(1))).toDF("a")
checkAnswer(