aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-07-18 09:49:14 -0700
committerYin Huai <yhuai@databricks.com>2016-07-18 09:49:14 -0700
commit2877f1a5224c38c1fa0b85ef633ff935fae9dd83 (patch)
treed0422246a272108bbae88c6d46c42f932bc1a4c4
parent8ea3f4eaec65ee4277f9943063fcc9488d3fa924 (diff)
downloadspark-2877f1a5224c38c1fa0b85ef633ff935fae9dd83.tar.gz
spark-2877f1a5224c38c1fa0b85ef633ff935fae9dd83.tar.bz2
spark-2877f1a5224c38c1fa0b85ef633ff935fae9dd83.zip
[SPARK-16351][SQL] Avoid per-record type dispatch in JSON when writing
## What changes were proposed in this pull request? Currently, `JacksonGenerator.apply` is doing type-based dispatch for each row to write appropriate values. It might not have to be done like this because the schema is already kept. So, appropriate writers can be created first according to the schema once, and then apply them to each row. This approach is similar with `CatalystWriteSupport`. This PR corrects `JacksonGenerator` so that it creates all writers for the schema once and then applies them to each row rather than type dispatching for every row. Benchmark was proceeded with the codes below: ```scala test("Benchmark for JSON writer") { val N = 500 << 8 val row = """{"struct":{"field1": true, "field2": 92233720368547758070}, "structWithArrayFields":{"field1":[4, 5, 6], "field2":["str1", "str2"]}, "arrayOfString":["str1", "str2"], "arrayOfInteger":[1, 2147483647, -2147483648], "arrayOfLong":[21474836470, 9223372036854775807, -9223372036854775808], "arrayOfBigInteger":[922337203685477580700, -922337203685477580800], "arrayOfDouble":[1.2, 1.7976931348623157E308, 4.9E-324, 2.2250738585072014E-308], "arrayOfBoolean":[true, false, true], "arrayOfNull":[null, null, null, null], "arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}], "arrayOfArray1":[[1, 2, 3], ["str1", "str2"]], "arrayOfArray2":[[1, 2, 3], [1.1, 2.1, 3.1]] }""" val df = spark.sqlContext.read.json(spark.sparkContext.parallelize(List.fill(N)(row))) val benchmark = new Benchmark("JSON writer", N) benchmark.addCase("writing JSON file", 10) { _ => withTempPath { path => df.write.format("json").save(path.getCanonicalPath) } } benchmark.run() } ``` This produced the results below - **Before** ``` JSON writer: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ writing JSON file 1675 / 1767 0.1 13087.5 1.0X ``` - **After** ``` JSON writer: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ writing JSON file 1597 / 1686 0.1 12477.1 1.0X ``` In addition, I ran this benchmark 10 times for each and calculated the average elapsed time as below: | **Before** | **After**| |---------------|------------| |17478ms |16669ms | It seems roughly ~5% is improved. ## How was this patch tested? Existing tests should cover this. Author: hyukjinkwon <gurwls223@gmail.com> Closes #14028 from HyukjinKwon/SPARK-16351.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala218
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala3
4 files changed, 163 insertions, 67 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index ed4ccdb4c8..b28ecb753f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2489,12 +2489,12 @@ class Dataset[T] private[sql](
val rdd: RDD[String] = queryExecution.toRdd.mapPartitions { iter =>
val writer = new CharArrayWriter()
// create the Generator without separator inserted between 2 records
- val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)
+ val gen = new JacksonGenerator(rowSchema, writer)
new Iterator[String] {
override def hasNext: Boolean = iter.hasNext
override def next(): String = {
- JacksonGenerator(rowSchema, gen)(iter.next())
+ gen.write(iter.next())
gen.flush()
val json = writer.toString
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
index 8b920ecafa..23f4a55491 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
@@ -17,74 +17,174 @@
package org.apache.spark.sql.execution.datasources.json
+import java.io.Writer
+
import com.fasterxml.jackson.core._
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData}
import org.apache.spark.sql.types._
-private[sql] object JacksonGenerator {
- /** Transforms a single InternalRow to JSON using Jackson
- *
- * TODO: make the code shared with the other apply method.
+private[sql] class JacksonGenerator(schema: StructType, writer: Writer) {
+ // A `ValueWriter` is responsible for writing a field of an `InternalRow` to appropriate
+ // JSON data. Here we are using `SpecializedGetters` rather than `InternalRow` so that
+ // we can directly access data in `ArrayData` without the help of `SpecificMutableRow`.
+ private type ValueWriter = (SpecializedGetters, Int) => Unit
+
+ // `ValueWriter`s for all fields of the schema
+ private val rootFieldWriters: Array[ValueWriter] = schema.map(_.dataType).map(makeWriter).toArray
+
+ private val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)
+
+ private def makeWriter(dataType: DataType): ValueWriter = dataType match {
+ case NullType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeNull()
+
+ case BooleanType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeBoolean(row.getBoolean(ordinal))
+
+ case ByteType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeNumber(row.getByte(ordinal))
+
+ case ShortType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeNumber(row.getShort(ordinal))
+
+ case IntegerType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeNumber(row.getInt(ordinal))
+
+ case LongType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeNumber(row.getLong(ordinal))
+
+ case FloatType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeNumber(row.getFloat(ordinal))
+
+ case DoubleType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeNumber(row.getDouble(ordinal))
+
+ case StringType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeString(row.getUTF8String(ordinal).toString)
+
+ case TimestampType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeString(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)).toString)
+
+ case DateType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeString(DateTimeUtils.toJavaDate(row.getInt(ordinal)).toString)
+
+ case BinaryType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeBinary(row.getBinary(ordinal))
+
+ case dt: DecimalType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeNumber(row.getDecimal(ordinal, dt.precision, dt.scale).toJavaBigDecimal)
+
+ case st: StructType =>
+ val fieldWriters = st.map(_.dataType).map(makeWriter)
+ (row: SpecializedGetters, ordinal: Int) =>
+ writeObject(writeFields(row.getStruct(ordinal, st.length), st, fieldWriters))
+
+ case at: ArrayType =>
+ val elementWriter = makeWriter(at.elementType)
+ (row: SpecializedGetters, ordinal: Int) =>
+ writeArray(writeArrayData(row.getArray(ordinal), elementWriter))
+
+ case mt: MapType =>
+ val valueWriter = makeWriter(mt.valueType)
+ (row: SpecializedGetters, ordinal: Int) =>
+ writeObject(writeMapData(row.getMap(ordinal), mt, valueWriter))
+
+ // For UDT values, they should be in the SQL type's corresponding value type.
+ // We should not see values in the user-defined class at here.
+ // For example, VectorUDT's SQL type is an array of double. So, we should expect that v is
+ // an ArrayData at here, instead of a Vector.
+ case t: UserDefinedType[_] =>
+ makeWriter(t.sqlType)
+
+ case _ =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ val v = row.get(ordinal, dataType)
+ sys.error(s"Failed to convert value $v (class of ${v.getClass}}) " +
+ s"with the type of $dataType to JSON.")
+ }
+
+ private def writeObject(f: => Unit): Unit = {
+ gen.writeStartObject()
+ f
+ gen.writeEndObject()
+ }
+
+ private def writeFields(
+ row: InternalRow, schema: StructType, fieldWriters: Seq[ValueWriter]): Unit = {
+ var i = 0
+ while (i < row.numFields) {
+ val field = schema(i)
+ if (!row.isNullAt(i)) {
+ gen.writeFieldName(field.name)
+ fieldWriters(i).apply(row, i)
+ }
+ i += 1
+ }
+ }
+
+ private def writeArray(f: => Unit): Unit = {
+ gen.writeStartArray()
+ f
+ gen.writeEndArray()
+ }
+
+ private def writeArrayData(
+ array: ArrayData, fieldWriter: ValueWriter): Unit = {
+ var i = 0
+ while (i < array.numElements()) {
+ if (!array.isNullAt(i)) {
+ fieldWriter.apply(array, i)
+ } else {
+ gen.writeNull()
+ }
+ i += 1
+ }
+ }
+
+ private def writeMapData(
+ map: MapData, mapType: MapType, fieldWriter: ValueWriter): Unit = {
+ val keyArray = map.keyArray()
+ val valueArray = map.valueArray()
+ var i = 0
+ while (i < map.numElements()) {
+ gen.writeFieldName(keyArray.get(i, mapType.keyType).toString)
+ if (!valueArray.isNullAt(i)) {
+ fieldWriter.apply(valueArray, i)
+ } else {
+ gen.writeNull()
+ }
+ i += 1
+ }
+ }
+
+ def close(): Unit = gen.close()
+
+ def flush(): Unit = gen.flush()
+
+ /**
+ * Transforms a single InternalRow to JSON using Jackson
*
- * @param rowSchema the schema object used for conversion
- * @param gen a JsonGenerator object
* @param row The row to convert
*/
- def apply(rowSchema: StructType, gen: JsonGenerator)(row: InternalRow): Unit = {
- def valWriter: (DataType, Any) => Unit = {
- case (_, null) | (NullType, _) => gen.writeNull()
- case (StringType, v) => gen.writeString(v.toString)
- case (TimestampType, v: Long) => gen.writeString(DateTimeUtils.toJavaTimestamp(v).toString)
- case (IntegerType, v: Int) => gen.writeNumber(v)
- case (ShortType, v: Short) => gen.writeNumber(v)
- case (FloatType, v: Float) => gen.writeNumber(v)
- case (DoubleType, v: Double) => gen.writeNumber(v)
- case (LongType, v: Long) => gen.writeNumber(v)
- case (DecimalType(), v: Decimal) => gen.writeNumber(v.toJavaBigDecimal)
- case (ByteType, v: Byte) => gen.writeNumber(v.toInt)
- case (BinaryType, v: Array[Byte]) => gen.writeBinary(v)
- case (BooleanType, v: Boolean) => gen.writeBoolean(v)
- case (DateType, v: Int) => gen.writeString(DateTimeUtils.toJavaDate(v).toString)
- // For UDT values, they should be in the SQL type's corresponding value type.
- // We should not see values in the user-defined class at here.
- // For example, VectorUDT's SQL type is an array of double. So, we should expect that v is
- // an ArrayData at here, instead of a Vector.
- case (udt: UserDefinedType[_], v) => valWriter(udt.sqlType, v)
-
- case (ArrayType(ty, _), v: ArrayData) =>
- gen.writeStartArray()
- v.foreach(ty, (_, value) => valWriter(ty, value))
- gen.writeEndArray()
-
- case (MapType(kt, vt, _), v: MapData) =>
- gen.writeStartObject()
- v.foreach(kt, vt, { (k, v) =>
- gen.writeFieldName(k.toString)
- valWriter(vt, v)
- })
- gen.writeEndObject()
-
- case (StructType(ty), v: InternalRow) =>
- gen.writeStartObject()
- var i = 0
- while (i < ty.length) {
- val field = ty(i)
- val value = v.get(i, field.dataType)
- if (value != null) {
- gen.writeFieldName(field.name)
- valWriter(field.dataType, value)
- }
- i += 1
- }
- gen.writeEndObject()
-
- case (dt, v) =>
- sys.error(
- s"Failed to convert value $v (class of ${v.getClass}}) with the type of $dt to JSON.")
+ def write(row: InternalRow): Unit = {
+ writeObject {
+ writeFields(row, schema, rootFieldWriters)
}
-
- valWriter(rowSchema, row)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 86aef1f7d4..adca8d7af0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.datasources.json
import java.io.CharArrayWriter
-import com.fasterxml.jackson.core.JsonFactory
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
@@ -162,7 +161,7 @@ private[json] class JsonOutputWriter(
private[this] val writer = new CharArrayWriter()
// create the Generator without separator inserted between 2 records
- private[this] val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)
+ private[this] val gen = new JacksonGenerator(dataSchema, writer)
private[this] val result = new Text()
private val recordWriter: RecordWriter[NullWritable, Text] = {
@@ -181,7 +180,7 @@ private[json] class JsonOutputWriter(
override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
override protected[sql] def writeInternal(row: InternalRow): Unit = {
- JacksonGenerator(dataSchema, gen)(row)
+ gen.write(row)
gen.flush()
result.set(writer.toString)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 6c72019702..a09f61aba9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -21,10 +21,7 @@ import java.io.{File, StringWriter}
import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
-import scala.collection.JavaConverters._
-
import com.fasterxml.jackson.core.JsonFactory
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec