aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@appier.com>2015-09-24 12:52:11 -0700
committerYin Huai <yhuai@databricks.com>2015-09-24 12:52:11 -0700
commitb3862d3c59746ffb5f089aea4ff9e6f033a2c658 (patch)
treea8d508b2dfb065dce4c0b166aead5de02d77a24e
parent341b13f8f5eb118f1fb4d4f84418715ac4750a4d (diff)
downloadspark-b3862d3c59746ffb5f089aea4ff9e6f033a2c658.tar.gz
spark-b3862d3c59746ffb5f089aea4ff9e6f033a2c658.tar.bz2
spark-b3862d3c59746ffb5f089aea4ff9e6f033a2c658.zip
[SPARK-10705] [SQL] Avoid using external rows in DataFrame.toJSON
JIRA: https://issues.apache.org/jira/browse/SPARK-10705 As described in the JIRA ticket, `DataFrame.toJSON` uses `DataFrame.mapPartitions`, which converts internal rows to external rows. We should use `queryExecution.toRdd.mapPartitions` that directly uses internal rows for better performance. Author: Liang-Chi Hsieh <viirya@appier.com> Closes #8865 from viirya/df-tojson-internalrow.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala84
3 files changed, 3 insertions, 85 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index a11140b717..f9995da3a8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1564,7 +1564,7 @@ class DataFrame private[sql](
*/
def toJSON: RDD[String] = {
val rowSchema = this.schema
- this.mapPartitions { iter =>
+ queryExecution.toRdd.mapPartitions { iter =>
val writer = new CharArrayWriter()
// create the Generator without separator inserted between 2 records
val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 8ee0127c3b..d05e6efa83 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -182,7 +182,7 @@ private[json] class JsonOutputWriter(
override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
override protected[sql] def writeInternal(row: InternalRow): Unit = {
- JacksonGenerator(dataSchema, gen, row)
+ JacksonGenerator(dataSchema, gen)(row)
gen.flush()
result.set(writer.toString)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
index 23bada1ddd..d7d6edeb6c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
@@ -28,88 +28,6 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
private[sql] object JacksonGenerator {
- /** Transforms a single Row to JSON using Jackson
- *
- * @param rowSchema the schema object used for conversion
- * @param gen a JsonGenerator object
- * @param row The row to convert
- */
- def apply(rowSchema: StructType, gen: JsonGenerator)(row: Row): Unit = {
- def valWriter: (DataType, Any) => Unit = {
- case (_, null) | (NullType, _) => gen.writeNull()
- case (StringType, v: String) => gen.writeString(v)
- case (TimestampType, v: java.sql.Timestamp) => gen.writeString(v.toString)
- case (IntegerType, v: Int) => gen.writeNumber(v)
- case (ShortType, v: Short) => gen.writeNumber(v)
- case (FloatType, v: Float) => gen.writeNumber(v)
- case (DoubleType, v: Double) => gen.writeNumber(v)
- case (LongType, v: Long) => gen.writeNumber(v)
- case (DecimalType(), v: java.math.BigDecimal) => gen.writeNumber(v)
- case (ByteType, v: Byte) => gen.writeNumber(v.toInt)
- case (BinaryType, v: Array[Byte]) => gen.writeBinary(v)
- case (BooleanType, v: Boolean) => gen.writeBoolean(v)
- case (DateType, v) => gen.writeString(v.toString)
- case (udt: UserDefinedType[_], v) => valWriter(udt.sqlType, udt.serialize(v))
-
- case (ArrayType(ty, _), v: Seq[_]) =>
- gen.writeStartArray()
- v.foreach(valWriter(ty, _))
- gen.writeEndArray()
-
- case (MapType(kv, vv, _), v: Map[_, _]) =>
- gen.writeStartObject()
- v.foreach { p =>
- gen.writeFieldName(p._1.toString)
- valWriter(vv, p._2)
- }
- gen.writeEndObject()
-
- case (StructType(ty), v: Row) =>
- gen.writeStartObject()
- ty.zip(v.toSeq).foreach {
- case (_, null) =>
- case (field, v) =>
- gen.writeFieldName(field.name)
- valWriter(field.dataType, v)
- }
- gen.writeEndObject()
-
- // For UDT, udt.serialize will produce SQL types. So, we need the following three cases.
- case (ArrayType(ty, _), v: ArrayData) =>
- gen.writeStartArray()
- v.foreach(ty, (_, value) => valWriter(ty, value))
- gen.writeEndArray()
-
- case (MapType(kt, vt, _), v: MapData) =>
- gen.writeStartObject()
- v.foreach(kt, vt, { (k, v) =>
- gen.writeFieldName(k.toString)
- valWriter(vt, v)
- })
- gen.writeEndObject()
-
- case (StructType(ty), v: InternalRow) =>
- gen.writeStartObject()
- var i = 0
- while (i < ty.length) {
- val field = ty(i)
- val value = v.get(i, field.dataType)
- if (value != null) {
- gen.writeFieldName(field.name)
- valWriter(field.dataType, value)
- }
- i += 1
- }
- gen.writeEndObject()
-
- case (dt, v) =>
- sys.error(
- s"Failed to convert value $v (class of ${v.getClass}}) with the type of $dt to JSON.")
- }
-
- valWriter(rowSchema, row)
- }
-
/** Transforms a single InternalRow to JSON using Jackson
*
* TODO: make the code shared with the other apply method.
@@ -118,7 +36,7 @@ private[sql] object JacksonGenerator {
* @param gen a JsonGenerator object
* @param row The row to convert
*/
- def apply(rowSchema: StructType, gen: JsonGenerator, row: InternalRow): Unit = {
+ def apply(rowSchema: StructType, gen: JsonGenerator)(row: InternalRow): Unit = {
def valWriter: (DataType, Any) => Unit = {
case (_, null) | (NullType, _) => gen.writeNull()
case (StringType, v) => gen.writeString(v.toString)