From b3862d3c59746ffb5f089aea4ff9e6f033a2c658 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 24 Sep 2015 12:52:11 -0700 Subject: [SPARK-10705] [SQL] Avoid using external rows in DataFrame.toJSON JIRA: https://issues.apache.org/jira/browse/SPARK-10705 As described in the JIRA ticket, `DataFrame.toJSON` uses `DataFrame.mapPartitions`, which converts internal rows to external rows. We should use `queryExecution.toRdd.mapPartitions` that directly uses internal rows for better performance. Author: Liang-Chi Hsieh Closes #8865 from viirya/df-tojson-internalrow. --- .../scala/org/apache/spark/sql/DataFrame.scala | 2 +- .../execution/datasources/json/JSONRelation.scala | 2 +- .../datasources/json/JacksonGenerator.scala | 84 +--------------------- 3 files changed, 3 insertions(+), 85 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index a11140b717..f9995da3a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1564,7 +1564,7 @@ class DataFrame private[sql]( */ def toJSON: RDD[String] = { val rowSchema = this.schema - this.mapPartitions { iter => + queryExecution.toRdd.mapPartitions { iter => val writer = new CharArrayWriter() // create the Generator without separator inserted between 2 records val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala index 8ee0127c3b..d05e6efa83 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala @@ -182,7 +182,7 @@ private[json] class JsonOutputWriter( override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal") override protected[sql] def writeInternal(row: InternalRow): Unit = { - JacksonGenerator(dataSchema, gen, row) + JacksonGenerator(dataSchema, gen)(row) gen.flush() result.set(writer.toString) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala index 23bada1ddd..d7d6edeb6c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala @@ -28,88 +28,6 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.types._ private[sql] object JacksonGenerator { - /** Transforms a single Row to JSON using Jackson - * - * @param rowSchema the schema object used for conversion - * @param gen a JsonGenerator object - * @param row The row to convert - */ - def apply(rowSchema: StructType, gen: JsonGenerator)(row: Row): Unit = { - def valWriter: (DataType, Any) => Unit = { - case (_, null) | (NullType, _) => gen.writeNull() - case (StringType, v: String) => gen.writeString(v) - case (TimestampType, v: java.sql.Timestamp) => gen.writeString(v.toString) - case (IntegerType, v: Int) => gen.writeNumber(v) - case (ShortType, v: Short) => gen.writeNumber(v) - case (FloatType, v: Float) => gen.writeNumber(v) - case (DoubleType, v: Double) => gen.writeNumber(v) - case (LongType, v: Long) => gen.writeNumber(v) - case (DecimalType(), v: java.math.BigDecimal) => gen.writeNumber(v) - case (ByteType, v: Byte) => gen.writeNumber(v.toInt) - case (BinaryType, v: Array[Byte]) => gen.writeBinary(v) - case (BooleanType, v: Boolean) => gen.writeBoolean(v) - case (DateType, v) => gen.writeString(v.toString) - case (udt: UserDefinedType[_], v) => valWriter(udt.sqlType, udt.serialize(v)) - - case (ArrayType(ty, _), v: Seq[_]) => - gen.writeStartArray() - v.foreach(valWriter(ty, _)) - gen.writeEndArray() - - case (MapType(kv, vv, _), v: Map[_, _]) => - gen.writeStartObject() - v.foreach { p => - gen.writeFieldName(p._1.toString) - valWriter(vv, p._2) - } - gen.writeEndObject() - - case (StructType(ty), v: Row) => - gen.writeStartObject() - ty.zip(v.toSeq).foreach { - case (_, null) => - case (field, v) => - gen.writeFieldName(field.name) - valWriter(field.dataType, v) - } - gen.writeEndObject() - - // For UDT, udt.serialize will produce SQL types. So, we need the following three cases. - case (ArrayType(ty, _), v: ArrayData) => - gen.writeStartArray() - v.foreach(ty, (_, value) => valWriter(ty, value)) - gen.writeEndArray() - - case (MapType(kt, vt, _), v: MapData) => - gen.writeStartObject() - v.foreach(kt, vt, { (k, v) => - gen.writeFieldName(k.toString) - valWriter(vt, v) - }) - gen.writeEndObject() - - case (StructType(ty), v: InternalRow) => - gen.writeStartObject() - var i = 0 - while (i < ty.length) { - val field = ty(i) - val value = v.get(i, field.dataType) - if (value != null) { - gen.writeFieldName(field.name) - valWriter(field.dataType, value) - } - i += 1 - } - gen.writeEndObject() - - case (dt, v) => - sys.error( - s"Failed to convert value $v (class of ${v.getClass}}) with the type of $dt to JSON.") - } - - valWriter(rowSchema, row) - } - /** Transforms a single InternalRow to JSON using Jackson * * TODO: make the code shared with the other apply method. @@ -118,7 +36,7 @@ private[sql] object JacksonGenerator { * @param gen a JsonGenerator object * @param row The row to convert */ - def apply(rowSchema: StructType, gen: JsonGenerator, row: InternalRow): Unit = { + def apply(rowSchema: StructType, gen: JsonGenerator)(row: InternalRow): Unit = { def valWriter: (DataType, Any) => Unit = { case (_, null) | (NullType, _) => gen.writeNull() case (StringType, v) => gen.writeString(v.toString) -- cgit v1.2.3