diff options
Diffstat (limited to 'sql/catalyst')
4 files changed, 278 insertions, 2 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 65dbd6a4e3..244a5a34f3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -17,16 +17,17 @@ package org.apache.spark.sql.catalyst.expressions -import java.io.{ByteArrayOutputStream, StringWriter} +import java.io.{ByteArrayOutputStream, CharArrayWriter, StringWriter} import scala.util.parsing.combinator.RegexParsers import com.fasterxml.jackson.core._ +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions, SparkSQLJsonProcessingException} +import org.apache.spark.sql.catalyst.json._ import org.apache.spark.sql.catalyst.util.ParseModes import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -494,3 +495,46 @@ case class JsonToStruct(schema: StructType, options: Map[String, String], child: override def inputTypes: Seq[AbstractDataType] = StringType :: Nil } + +/** + * Converts a [[StructType]] to a json output string. + */ +case class StructToJson(options: Map[String, String], child: Expression) + extends Expression with CodegenFallback with ExpectsInputTypes { + override def nullable: Boolean = true + + @transient + lazy val writer = new CharArrayWriter() + + @transient + lazy val gen = + new JacksonGenerator(child.dataType.asInstanceOf[StructType], writer) + + override def dataType: DataType = StringType + override def children: Seq[Expression] = child :: Nil + + override def checkInputDataTypes(): TypeCheckResult = { + if (StructType.acceptsType(child.dataType)) { + try { + JacksonUtils.verifySchema(child.dataType.asInstanceOf[StructType]) + TypeCheckResult.TypeCheckSuccess + } catch { + case e: UnsupportedOperationException => + TypeCheckResult.TypeCheckFailure(e.getMessage) + } + } else { + TypeCheckResult.TypeCheckFailure( + s"$prettyName requires that the expression is a struct expression.") + } + } + + override def eval(input: InternalRow): Any = { + gen.write(child.eval(input).asInstanceOf[InternalRow]) + gen.flush() + val json = writer.toString + writer.reset() + UTF8String.fromString(json) + } + + override def inputTypes: Seq[AbstractDataType] = StructType :: Nil +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala new file mode 100644 index 0000000000..4b548e0e7f --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.json + +import java.io.Writer + +import com.fasterxml.jackson.core._ + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.SpecializedGetters +import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData} +import org.apache.spark.sql.types._ + +private[sql] class JacksonGenerator( + schema: StructType, + writer: Writer, + options: JSONOptions = new JSONOptions(Map.empty[String, String])) { + // A `ValueWriter` is responsible for writing a field of an `InternalRow` to appropriate + // JSON data. Here we are using `SpecializedGetters` rather than `InternalRow` so that + // we can directly access data in `ArrayData` without the help of `SpecificMutableRow`. + private type ValueWriter = (SpecializedGetters, Int) => Unit + + // `ValueWriter`s for all fields of the schema + private val rootFieldWriters: Array[ValueWriter] = schema.map(_.dataType).map(makeWriter).toArray + + private val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null) + + private def makeWriter(dataType: DataType): ValueWriter = dataType match { + case NullType => + (row: SpecializedGetters, ordinal: Int) => + gen.writeNull() + + case BooleanType => + (row: SpecializedGetters, ordinal: Int) => + gen.writeBoolean(row.getBoolean(ordinal)) + + case ByteType => + (row: SpecializedGetters, ordinal: Int) => + gen.writeNumber(row.getByte(ordinal)) + + case ShortType => + (row: SpecializedGetters, ordinal: Int) => + gen.writeNumber(row.getShort(ordinal)) + + case IntegerType => + (row: SpecializedGetters, ordinal: Int) => + gen.writeNumber(row.getInt(ordinal)) + + case LongType => + (row: SpecializedGetters, ordinal: Int) => + gen.writeNumber(row.getLong(ordinal)) + + case FloatType => + (row: SpecializedGetters, ordinal: Int) => + gen.writeNumber(row.getFloat(ordinal)) + + case DoubleType => + (row: SpecializedGetters, ordinal: Int) => + gen.writeNumber(row.getDouble(ordinal)) + + case StringType => + (row: SpecializedGetters, ordinal: Int) => + gen.writeString(row.getUTF8String(ordinal).toString) + + case TimestampType => + (row: SpecializedGetters, ordinal: Int) => + val timestampString = + options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal))) + gen.writeString(timestampString) + + case DateType => + (row: SpecializedGetters, ordinal: Int) => + val dateString = + options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal))) + gen.writeString(dateString) + + case BinaryType => + (row: SpecializedGetters, ordinal: Int) => + gen.writeBinary(row.getBinary(ordinal)) + + case dt: DecimalType => + (row: SpecializedGetters, ordinal: Int) => + gen.writeNumber(row.getDecimal(ordinal, dt.precision, dt.scale).toJavaBigDecimal) + + case st: StructType => + val fieldWriters = st.map(_.dataType).map(makeWriter) + (row: SpecializedGetters, ordinal: Int) => + writeObject(writeFields(row.getStruct(ordinal, st.length), st, fieldWriters)) + + case at: ArrayType => + val elementWriter = makeWriter(at.elementType) + (row: SpecializedGetters, ordinal: Int) => + writeArray(writeArrayData(row.getArray(ordinal), elementWriter)) + + case mt: MapType => + val valueWriter = makeWriter(mt.valueType) + (row: SpecializedGetters, ordinal: Int) => + writeObject(writeMapData(row.getMap(ordinal), mt, valueWriter)) + + // For UDT values, they should be in the SQL type's corresponding value type. + // We should not see values in the user-defined class at here. + // For example, VectorUDT's SQL type is an array of double. So, we should expect that v is + // an ArrayData at here, instead of a Vector. + case t: UserDefinedType[_] => + makeWriter(t.sqlType) + + case _ => + (row: SpecializedGetters, ordinal: Int) => + val v = row.get(ordinal, dataType) + sys.error(s"Failed to convert value $v (class of ${v.getClass}}) " + + s"with the type of $dataType to JSON.") + } + + private def writeObject(f: => Unit): Unit = { + gen.writeStartObject() + f + gen.writeEndObject() + } + + private def writeFields( + row: InternalRow, schema: StructType, fieldWriters: Seq[ValueWriter]): Unit = { + var i = 0 + while (i < row.numFields) { + val field = schema(i) + if (!row.isNullAt(i)) { + gen.writeFieldName(field.name) + fieldWriters(i).apply(row, i) + } + i += 1 + } + } + + private def writeArray(f: => Unit): Unit = { + gen.writeStartArray() + f + gen.writeEndArray() + } + + private def writeArrayData( + array: ArrayData, fieldWriter: ValueWriter): Unit = { + var i = 0 + while (i < array.numElements()) { + if (!array.isNullAt(i)) { + fieldWriter.apply(array, i) + } else { + gen.writeNull() + } + i += 1 + } + } + + private def writeMapData( + map: MapData, mapType: MapType, fieldWriter: ValueWriter): Unit = { + val keyArray = map.keyArray() + val valueArray = map.valueArray() + var i = 0 + while (i < map.numElements()) { + gen.writeFieldName(keyArray.get(i, mapType.keyType).toString) + if (!valueArray.isNullAt(i)) { + fieldWriter.apply(valueArray, i) + } else { + gen.writeNull() + } + i += 1 + } + } + + def close(): Unit = gen.close() + + def flush(): Unit = gen.flush() + + /** + * Transforms a single InternalRow to JSON using Jackson + * + * @param row The row to convert + */ + def write(row: InternalRow): Unit = { + writeObject { + writeFields(row, schema, rootFieldWriters) + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala index c4d9abb2c0..3b23c6cd28 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.json import com.fasterxml.jackson.core.{JsonParser, JsonToken} +import org.apache.spark.sql.types._ + object JacksonUtils { /** * Advance the parser until a null or a specific token is found @@ -29,4 +31,28 @@ object JacksonUtils { case x => x != stopOn } } + + /** + * Verify if the schema is supported in JSON parsing. + */ + def verifySchema(schema: StructType): Unit = { + def verifyType(name: String, dataType: DataType): Unit = dataType match { + case NullType | BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | + DoubleType | StringType | TimestampType | DateType | BinaryType | _: DecimalType => + + case st: StructType => st.foreach(field => verifyType(field.name, field.dataType)) + + case at: ArrayType => verifyType(name, at.elementType) + + case mt: MapType => verifyType(name, mt.keyType) + + case udt: UserDefinedType[_] => verifyType(name, udt.sqlType) + + case _ => + throw new UnsupportedOperationException( + s"Unable to convert column $name of type ${dataType.simpleString} to JSON.") + } + + schema.foreach(field => verifyType(field.name, field.dataType)) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index 84623934d9..f9db649bc2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -343,4 +343,13 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { null ) } + + test("to_json") { + val schema = StructType(StructField("a", IntegerType) :: Nil) + val struct = Literal.create(create_row(1), schema) + checkEvaluation( + StructToJson(Map.empty, struct), + """{"a":1}""" + ) + } } |