aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-11-01 12:46:41 -0700
committerMichael Armbrust <michael@databricks.com>2016-11-01 12:46:41 -0700
commit01dd0083011741c2bbe5ae1d2a25f2c9a1302b76 (patch)
tree7b9993165b1a4f48e64d566d93c7883a3096403d /sql/core/src
parentcfac17ee1cec414663b957228e469869eb7673c1 (diff)
downloadspark-01dd0083011741c2bbe5ae1d2a25f2c9a1302b76.tar.gz
spark-01dd0083011741c2bbe5ae1d2a25f2c9a1302b76.tar.bz2
spark-01dd0083011741c2bbe5ae1d2a25f2c9a1302b76.zip
[SPARK-17764][SQL] Add `to_json` supporting to convert nested struct column to JSON string
## What changes were proposed in this pull request? This PR proposes to add `to_json` function in contrast with `from_json` in Scala, Java and Python. It'd be useful if we can convert a same column from/to json. Also, some datasources do not support nested types. If we are forced to save a dataframe into those data sources, we might be able to work around by this function. The usage is as below: ``` scala val df = Seq(Tuple1(Tuple1(1))).toDF("a") df.select(to_json($"a").as("json")).show() ``` ``` bash +--------+ | json| +--------+ |{"_1":1}| +--------+ ``` ## How was this patch tested? Unit tests in `JsonFunctionsSuite` and `JsonExpressionsSuite`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #15354 from HyukjinKwon/SPARK-17764.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala198
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/functions.scala44
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala30
5 files changed, 69 insertions, 207 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 6e0a2471e0..eb2b20afc3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.json.JacksonGenerator
import org.apache.spark.sql.catalyst.optimizer.CombineUnions
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
@@ -45,7 +46,6 @@ import org.apache.spark.sql.catalyst.util.usePrettyExpression
import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution}
import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView}
import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.streaming.DataStreamWriter
import org.apache.spark.sql.types._
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
deleted file mode 100644
index 5b55b70186..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.json
-
-import java.io.Writer
-
-import com.fasterxml.jackson.core._
-
-import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.json.JSONOptions
-import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData}
-import org.apache.spark.sql.types._
-
-private[sql] class JacksonGenerator(
- schema: StructType,
- writer: Writer,
- options: JSONOptions = new JSONOptions(Map.empty[String, String])) {
- // A `ValueWriter` is responsible for writing a field of an `InternalRow` to appropriate
- // JSON data. Here we are using `SpecializedGetters` rather than `InternalRow` so that
- // we can directly access data in `ArrayData` without the help of `SpecificMutableRow`.
- private type ValueWriter = (SpecializedGetters, Int) => Unit
-
- // `ValueWriter`s for all fields of the schema
- private val rootFieldWriters: Array[ValueWriter] = schema.map(_.dataType).map(makeWriter).toArray
-
- private val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)
-
- private def makeWriter(dataType: DataType): ValueWriter = dataType match {
- case NullType =>
- (row: SpecializedGetters, ordinal: Int) =>
- gen.writeNull()
-
- case BooleanType =>
- (row: SpecializedGetters, ordinal: Int) =>
- gen.writeBoolean(row.getBoolean(ordinal))
-
- case ByteType =>
- (row: SpecializedGetters, ordinal: Int) =>
- gen.writeNumber(row.getByte(ordinal))
-
- case ShortType =>
- (row: SpecializedGetters, ordinal: Int) =>
- gen.writeNumber(row.getShort(ordinal))
-
- case IntegerType =>
- (row: SpecializedGetters, ordinal: Int) =>
- gen.writeNumber(row.getInt(ordinal))
-
- case LongType =>
- (row: SpecializedGetters, ordinal: Int) =>
- gen.writeNumber(row.getLong(ordinal))
-
- case FloatType =>
- (row: SpecializedGetters, ordinal: Int) =>
- gen.writeNumber(row.getFloat(ordinal))
-
- case DoubleType =>
- (row: SpecializedGetters, ordinal: Int) =>
- gen.writeNumber(row.getDouble(ordinal))
-
- case StringType =>
- (row: SpecializedGetters, ordinal: Int) =>
- gen.writeString(row.getUTF8String(ordinal).toString)
-
- case TimestampType =>
- (row: SpecializedGetters, ordinal: Int) =>
- val timestampString =
- options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
- gen.writeString(timestampString)
-
- case DateType =>
- (row: SpecializedGetters, ordinal: Int) =>
- val dateString =
- options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal)))
- gen.writeString(dateString)
-
- case BinaryType =>
- (row: SpecializedGetters, ordinal: Int) =>
- gen.writeBinary(row.getBinary(ordinal))
-
- case dt: DecimalType =>
- (row: SpecializedGetters, ordinal: Int) =>
- gen.writeNumber(row.getDecimal(ordinal, dt.precision, dt.scale).toJavaBigDecimal)
-
- case st: StructType =>
- val fieldWriters = st.map(_.dataType).map(makeWriter)
- (row: SpecializedGetters, ordinal: Int) =>
- writeObject(writeFields(row.getStruct(ordinal, st.length), st, fieldWriters))
-
- case at: ArrayType =>
- val elementWriter = makeWriter(at.elementType)
- (row: SpecializedGetters, ordinal: Int) =>
- writeArray(writeArrayData(row.getArray(ordinal), elementWriter))
-
- case mt: MapType =>
- val valueWriter = makeWriter(mt.valueType)
- (row: SpecializedGetters, ordinal: Int) =>
- writeObject(writeMapData(row.getMap(ordinal), mt, valueWriter))
-
- // For UDT values, they should be in the SQL type's corresponding value type.
- // We should not see values in the user-defined class at here.
- // For example, VectorUDT's SQL type is an array of double. So, we should expect that v is
- // an ArrayData at here, instead of a Vector.
- case t: UserDefinedType[_] =>
- makeWriter(t.sqlType)
-
- case _ =>
- (row: SpecializedGetters, ordinal: Int) =>
- val v = row.get(ordinal, dataType)
- sys.error(s"Failed to convert value $v (class of ${v.getClass}}) " +
- s"with the type of $dataType to JSON.")
- }
-
- private def writeObject(f: => Unit): Unit = {
- gen.writeStartObject()
- f
- gen.writeEndObject()
- }
-
- private def writeFields(
- row: InternalRow, schema: StructType, fieldWriters: Seq[ValueWriter]): Unit = {
- var i = 0
- while (i < row.numFields) {
- val field = schema(i)
- if (!row.isNullAt(i)) {
- gen.writeFieldName(field.name)
- fieldWriters(i).apply(row, i)
- }
- i += 1
- }
- }
-
- private def writeArray(f: => Unit): Unit = {
- gen.writeStartArray()
- f
- gen.writeEndArray()
- }
-
- private def writeArrayData(
- array: ArrayData, fieldWriter: ValueWriter): Unit = {
- var i = 0
- while (i < array.numElements()) {
- if (!array.isNullAt(i)) {
- fieldWriter.apply(array, i)
- } else {
- gen.writeNull()
- }
- i += 1
- }
- }
-
- private def writeMapData(
- map: MapData, mapType: MapType, fieldWriter: ValueWriter): Unit = {
- val keyArray = map.keyArray()
- val valueArray = map.valueArray()
- var i = 0
- while (i < map.numElements()) {
- gen.writeFieldName(keyArray.get(i, mapType.keyType).toString)
- if (!valueArray.isNullAt(i)) {
- fieldWriter.apply(valueArray, i)
- } else {
- gen.writeNull()
- }
- i += 1
- }
- }
-
- def close(): Unit = gen.close()
-
- def flush(): Unit = gen.flush()
-
- /**
- * Transforms a single InternalRow to JSON using Jackson
- *
- * @param row The row to convert
- */
- def write(row: InternalRow): Unit = {
- writeObject {
- writeFields(row, schema, rootFieldWriters)
- }
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 5a409c04c9..0e38aefecb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
+import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JacksonParser, JSONOptions}
import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.text.TextOutputWriter
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 5f1efd22d8..944a476114 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -2883,10 +2883,10 @@ object functions {
* (Scala-specific) Parses a column containing a JSON string into a [[StructType]] with the
* specified schema. Returns `null`, in the case of an unparseable string.
*
+ * @param e a string column containing JSON data.
* @param schema the schema to use when parsing the json string
* @param options options to control how the json is parsed. accepts the same options and the
* json data source.
- * @param e a string column containing JSON data.
*
* @group collection_funcs
* @since 2.1.0
@@ -2936,6 +2936,48 @@ object functions {
def from_json(e: Column, schema: String, options: java.util.Map[String, String]): Column =
from_json(e, DataType.fromJson(schema).asInstanceOf[StructType], options)
+
+ /**
+ * (Scala-specific) Converts a column containing a [[StructType]] into a JSON string with the
+ * specified schema. Throws an exception, in the case of an unsupported type.
+ *
+ * @param e a struct column.
+ * @param options options to control how the struct column is converted into a json string.
+ * accepts the same options and the json data source.
+ *
+ * @group collection_funcs
+ * @since 2.1.0
+ */
+ def to_json(e: Column, options: Map[String, String]): Column = withExpr {
+ StructToJson(options, e.expr)
+ }
+
+ /**
+ * (Java-specific) Converts a column containing a [[StructType]] into a JSON string with the
+ * specified schema. Throws an exception, in the case of an unsupported type.
+ *
+ * @param e a struct column.
+ * @param options options to control how the struct column is converted into a json string.
+ * accepts the same options and the json data source.
+ *
+ * @group collection_funcs
+ * @since 2.1.0
+ */
+ def to_json(e: Column, options: java.util.Map[String, String]): Column =
+ to_json(e, options.asScala.toMap)
+
+ /**
+ * Converts a column containing a [[StructType]] into a JSON string with the
+ * specified schema. Throws an exception, in the case of an unsupported type.
+ *
+ * @param e a struct column.
+ *
+ * @group collection_funcs
+ * @since 2.1.0
+ */
+ def to_json(e: Column): Column =
+ to_json(e, Map.empty[String, String])
+
/**
* Returns length of array or map.
*
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index 518d6e92b2..59ae889cf3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -17,9 +17,9 @@
package org.apache.spark.sql
-import org.apache.spark.sql.functions.from_json
+import org.apache.spark.sql.functions.{from_json, struct, to_json}
import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.types.{CalendarIntervalType, IntegerType, StructType}
class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
import testImplicits._
@@ -31,7 +31,6 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
Row("alice", "5"))
}
-
val tuples: Seq[(String, String)] =
("1", """{"f1": "value1", "f2": "value2", "f3": 3, "f5": 5.23}""") ::
("2", """{"f1": "value12", "f3": "value3", "f2": 2, "f4": 4.01}""") ::
@@ -97,7 +96,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
checkAnswer(expr, expected)
}
- test("json_parser") {
+ test("from_json") {
val df = Seq("""{"a": 1}""").toDS()
val schema = new StructType().add("a", IntegerType)
@@ -106,7 +105,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
Row(Row(1)) :: Nil)
}
- test("json_parser missing columns") {
+ test("from_json missing columns") {
val df = Seq("""{"a": 1}""").toDS()
val schema = new StructType().add("b", IntegerType)
@@ -115,7 +114,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
Row(Row(null)) :: Nil)
}
- test("json_parser invalid json") {
+ test("from_json invalid json") {
val df = Seq("""{"a" 1}""").toDS()
val schema = new StructType().add("a", IntegerType)
@@ -123,4 +122,23 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
df.select(from_json($"value", schema)),
Row(null) :: Nil)
}
+
+ test("to_json") {
+ val df = Seq(Tuple1(Tuple1(1))).toDF("a")
+
+ checkAnswer(
+ df.select(to_json($"a")),
+ Row("""{"_1":1}""") :: Nil)
+ }
+
+ test("to_json unsupported type") {
+ val df = Seq(Tuple1(Tuple1("interval -3 month 7 hours"))).toDF("a")
+ .select(struct($"a._1".cast(CalendarIntervalType).as("a")).as("c"))
+ val e = intercept[AnalysisException]{
+ // Unsupported type throws an exception
+ df.select(to_json($"c")).collect()
+ }
+ assert(e.getMessage.contains(
+ "Unable to convert column a of type calendarinterval to JSON."))
+ }
}