aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-11-01 12:46:41 -0700
committerMichael Armbrust <michael@databricks.com>2016-11-01 12:46:41 -0700
commit01dd0083011741c2bbe5ae1d2a25f2c9a1302b76 (patch)
tree7b9993165b1a4f48e64d566d93c7883a3096403d /sql/catalyst
parentcfac17ee1cec414663b957228e469869eb7673c1 (diff)
downloadspark-01dd0083011741c2bbe5ae1d2a25f2c9a1302b76.tar.gz
spark-01dd0083011741c2bbe5ae1d2a25f2c9a1302b76.tar.bz2
spark-01dd0083011741c2bbe5ae1d2a25f2c9a1302b76.zip
[SPARK-17764][SQL] Add `to_json` supporting to convert nested struct column to JSON string
## What changes were proposed in this pull request? This PR proposes to add `to_json` function in contrast with `from_json` in Scala, Java and Python. It'd be useful if we can convert a same column from/to json. Also, some datasources do not support nested types. If we are forced to save a dataframe into those data sources, we might be able to work around by this function. The usage is as below: ``` scala val df = Seq(Tuple1(Tuple1(1))).toDF("a") df.select(to_json($"a").as("json")).show() ``` ``` bash +--------+ | json| +--------+ |{"_1":1}| +--------+ ``` ## How was this patch tested? Unit tests in `JsonFunctionsSuite` and `JsonExpressionsSuite`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #15354 from HyukjinKwon/SPARK-17764.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala48
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala197
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala26
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala9
4 files changed, 278 insertions, 2 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 65dbd6a4e3..244a5a34f3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -17,16 +17,17 @@
package org.apache.spark.sql.catalyst.expressions
-import java.io.{ByteArrayOutputStream, StringWriter}
+import java.io.{ByteArrayOutputStream, CharArrayWriter, StringWriter}
import scala.util.parsing.combinator.RegexParsers
import com.fasterxml.jackson.core._
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions, SparkSQLJsonProcessingException}
+import org.apache.spark.sql.catalyst.json._
import org.apache.spark.sql.catalyst.util.ParseModes
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -494,3 +495,46 @@ case class JsonToStruct(schema: StructType, options: Map[String, String], child:
override def inputTypes: Seq[AbstractDataType] = StringType :: Nil
}
+
+/**
+ * Converts a [[StructType]] to a json output string.
+ */
+case class StructToJson(options: Map[String, String], child: Expression)
+ extends Expression with CodegenFallback with ExpectsInputTypes {
+ override def nullable: Boolean = true
+
+ @transient
+ lazy val writer = new CharArrayWriter()
+
+ @transient
+ lazy val gen =
+ new JacksonGenerator(child.dataType.asInstanceOf[StructType], writer)
+
+ override def dataType: DataType = StringType
+ override def children: Seq[Expression] = child :: Nil
+
+ override def checkInputDataTypes(): TypeCheckResult = {
+ if (StructType.acceptsType(child.dataType)) {
+ try {
+ JacksonUtils.verifySchema(child.dataType.asInstanceOf[StructType])
+ TypeCheckResult.TypeCheckSuccess
+ } catch {
+ case e: UnsupportedOperationException =>
+ TypeCheckResult.TypeCheckFailure(e.getMessage)
+ }
+ } else {
+ TypeCheckResult.TypeCheckFailure(
+ s"$prettyName requires that the expression is a struct expression.")
+ }
+ }
+
+ override def eval(input: InternalRow): Any = {
+ gen.write(child.eval(input).asInstanceOf[InternalRow])
+ gen.flush()
+ val json = writer.toString
+ writer.reset()
+ UTF8String.fromString(json)
+ }
+
+ override def inputTypes: Seq[AbstractDataType] = StructType :: Nil
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
new file mode 100644
index 0000000000..4b548e0e7f
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.json
+
+import java.io.Writer
+
+import com.fasterxml.jackson.core._
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
+import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData}
+import org.apache.spark.sql.types._
+
+private[sql] class JacksonGenerator(
+ schema: StructType,
+ writer: Writer,
+ options: JSONOptions = new JSONOptions(Map.empty[String, String])) {
+ // A `ValueWriter` is responsible for writing a field of an `InternalRow` to appropriate
+ // JSON data. Here we are using `SpecializedGetters` rather than `InternalRow` so that
+ // we can directly access data in `ArrayData` without the help of `SpecificMutableRow`.
+ private type ValueWriter = (SpecializedGetters, Int) => Unit
+
+ // `ValueWriter`s for all fields of the schema
+ private val rootFieldWriters: Array[ValueWriter] = schema.map(_.dataType).map(makeWriter).toArray
+
+ private val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)
+
+ private def makeWriter(dataType: DataType): ValueWriter = dataType match {
+ case NullType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeNull()
+
+ case BooleanType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeBoolean(row.getBoolean(ordinal))
+
+ case ByteType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeNumber(row.getByte(ordinal))
+
+ case ShortType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeNumber(row.getShort(ordinal))
+
+ case IntegerType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeNumber(row.getInt(ordinal))
+
+ case LongType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeNumber(row.getLong(ordinal))
+
+ case FloatType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeNumber(row.getFloat(ordinal))
+
+ case DoubleType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeNumber(row.getDouble(ordinal))
+
+ case StringType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeString(row.getUTF8String(ordinal).toString)
+
+ case TimestampType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ val timestampString =
+ options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
+ gen.writeString(timestampString)
+
+ case DateType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ val dateString =
+ options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal)))
+ gen.writeString(dateString)
+
+ case BinaryType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeBinary(row.getBinary(ordinal))
+
+ case dt: DecimalType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeNumber(row.getDecimal(ordinal, dt.precision, dt.scale).toJavaBigDecimal)
+
+ case st: StructType =>
+ val fieldWriters = st.map(_.dataType).map(makeWriter)
+ (row: SpecializedGetters, ordinal: Int) =>
+ writeObject(writeFields(row.getStruct(ordinal, st.length), st, fieldWriters))
+
+ case at: ArrayType =>
+ val elementWriter = makeWriter(at.elementType)
+ (row: SpecializedGetters, ordinal: Int) =>
+ writeArray(writeArrayData(row.getArray(ordinal), elementWriter))
+
+ case mt: MapType =>
+ val valueWriter = makeWriter(mt.valueType)
+ (row: SpecializedGetters, ordinal: Int) =>
+ writeObject(writeMapData(row.getMap(ordinal), mt, valueWriter))
+
+ // For UDT values, they should be in the SQL type's corresponding value type.
+ // We should not see values in the user-defined class at here.
+ // For example, VectorUDT's SQL type is an array of double. So, we should expect that v is
+ // an ArrayData at here, instead of a Vector.
+ case t: UserDefinedType[_] =>
+ makeWriter(t.sqlType)
+
+ case _ =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ val v = row.get(ordinal, dataType)
+ sys.error(s"Failed to convert value $v (class of ${v.getClass}}) " +
+ s"with the type of $dataType to JSON.")
+ }
+
+ private def writeObject(f: => Unit): Unit = {
+ gen.writeStartObject()
+ f
+ gen.writeEndObject()
+ }
+
+ private def writeFields(
+ row: InternalRow, schema: StructType, fieldWriters: Seq[ValueWriter]): Unit = {
+ var i = 0
+ while (i < row.numFields) {
+ val field = schema(i)
+ if (!row.isNullAt(i)) {
+ gen.writeFieldName(field.name)
+ fieldWriters(i).apply(row, i)
+ }
+ i += 1
+ }
+ }
+
+ private def writeArray(f: => Unit): Unit = {
+ gen.writeStartArray()
+ f
+ gen.writeEndArray()
+ }
+
+ private def writeArrayData(
+ array: ArrayData, fieldWriter: ValueWriter): Unit = {
+ var i = 0
+ while (i < array.numElements()) {
+ if (!array.isNullAt(i)) {
+ fieldWriter.apply(array, i)
+ } else {
+ gen.writeNull()
+ }
+ i += 1
+ }
+ }
+
+ private def writeMapData(
+ map: MapData, mapType: MapType, fieldWriter: ValueWriter): Unit = {
+ val keyArray = map.keyArray()
+ val valueArray = map.valueArray()
+ var i = 0
+ while (i < map.numElements()) {
+ gen.writeFieldName(keyArray.get(i, mapType.keyType).toString)
+ if (!valueArray.isNullAt(i)) {
+ fieldWriter.apply(valueArray, i)
+ } else {
+ gen.writeNull()
+ }
+ i += 1
+ }
+ }
+
+ def close(): Unit = gen.close()
+
+ def flush(): Unit = gen.flush()
+
+ /**
+ * Transforms a single InternalRow to JSON using Jackson
+ *
+ * @param row The row to convert
+ */
+ def write(row: InternalRow): Unit = {
+ writeObject {
+ writeFields(row, schema, rootFieldWriters)
+ }
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala
index c4d9abb2c0..3b23c6cd28 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.json
import com.fasterxml.jackson.core.{JsonParser, JsonToken}
+import org.apache.spark.sql.types._
+
object JacksonUtils {
/**
* Advance the parser until a null or a specific token is found
@@ -29,4 +31,28 @@ object JacksonUtils {
case x => x != stopOn
}
}
+
+ /**
+ * Verify if the schema is supported in JSON parsing.
+ */
+ def verifySchema(schema: StructType): Unit = {
+ def verifyType(name: String, dataType: DataType): Unit = dataType match {
+ case NullType | BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType |
+ DoubleType | StringType | TimestampType | DateType | BinaryType | _: DecimalType =>
+
+ case st: StructType => st.foreach(field => verifyType(field.name, field.dataType))
+
+ case at: ArrayType => verifyType(name, at.elementType)
+
+ case mt: MapType => verifyType(name, mt.keyType)
+
+ case udt: UserDefinedType[_] => verifyType(name, udt.sqlType)
+
+ case _ =>
+ throw new UnsupportedOperationException(
+ s"Unable to convert column $name of type ${dataType.simpleString} to JSON.")
+ }
+
+ schema.foreach(field => verifyType(field.name, field.dataType))
+ }
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index 84623934d9..f9db649bc2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -343,4 +343,13 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
null
)
}
+
+ test("to_json") {
+ val schema = StructType(StructField("a", IntegerType) :: Nil)
+ val struct = Literal.create(create_row(1), schema)
+ checkEvaluation(
+ StructToJson(Map.empty, struct),
+ """{"a":1}"""
+ )
+ }
}