aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2015-02-10 18:19:56 -0800
committerMichael Armbrust <michael@databricks.com>2015-02-10 18:19:56 -0800
commita60aea86b4d4b716b5ec3bff776b509fe0831342 (patch)
treeedd0220e8a69ee071b9d57aa9ad8dabb8e4868c4
parent6195e2473b98253ccc9edc3d624ba2bf59ffc398 (diff)
downloadspark-a60aea86b4d4b716b5ec3bff776b509fe0831342.tar.gz
spark-a60aea86b4d4b716b5ec3bff776b509fe0831342.tar.bz2
spark-a60aea86b4d4b716b5ec3bff776b509fe0831342.zip
[SPARK-5683] [SQL] Avoid multiple json generator created
Author: Cheng Hao <hao.cheng@intel.com> Closes #4468 from chenghao-intel/json and squashes the following commits: aeb7801 [Cheng Hao] avoid multiple json generator created
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala24
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala13
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala8
3 files changed, 29 insertions, 16 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
index 11f9334556..0134b038f3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql
+import java.io.CharArrayWriter
+
import scala.language.implicitConversions
import scala.reflect.ClassTag
import scala.collection.JavaConversions._
@@ -380,8 +382,26 @@ private[sql] class DataFrameImpl protected[sql](
override def toJSON: RDD[String] = {
val rowSchema = this.schema
this.mapPartitions { iter =>
- val jsonFactory = new JsonFactory()
- iter.map(JsonRDD.rowToJSON(rowSchema, jsonFactory))
+ val writer = new CharArrayWriter()
+ // create the Generator without separator inserted between 2 records
+ val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)
+
+ new Iterator[String] {
+ override def hasNext() = iter.hasNext
+ override def next(): String = {
+ JsonRDD.rowToJSON(rowSchema, gen)(iter.next())
+ gen.flush()
+
+ val json = writer.toString
+ if (hasNext) {
+ writer.reset()
+ } else {
+ gen.close()
+ }
+
+ json
+ }
+ }
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
index 33ce71b51b..1043eefcfc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -23,8 +23,7 @@ import java.sql.{Date, Timestamp}
import scala.collection.Map
import scala.collection.convert.Wrappers.{JMapWrapper, JListWrapper}
-import com.fasterxml.jackson.core.JsonProcessingException
-import com.fasterxml.jackson.core.JsonFactory
+import com.fasterxml.jackson.core.{JsonGenerator, JsonProcessingException, JsonFactory}
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.spark.rdd.RDD
@@ -430,14 +429,11 @@ private[sql] object JsonRDD extends Logging {
/** Transforms a single Row to JSON using Jackson
*
- * @param jsonFactory a JsonFactory object to construct a JsonGenerator
* @param rowSchema the schema object used for conversion
+ * @param gen a JsonGenerator object
* @param row The row to convert
*/
- private[sql] def rowToJSON(rowSchema: StructType, jsonFactory: JsonFactory)(row: Row): String = {
- val writer = new StringWriter()
- val gen = jsonFactory.createGenerator(writer)
-
+ private[sql] def rowToJSON(rowSchema: StructType, gen: JsonGenerator)(row: Row) = {
def valWriter: (DataType, Any) => Unit = {
case (_, null) | (NullType, _) => gen.writeNull()
case (StringType, v: String) => gen.writeString(v)
@@ -479,8 +475,5 @@ private[sql] object JsonRDD extends Logging {
}
valWriter(rowSchema, row)
- gen.close()
- writer.toString
}
-
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index 7870cf9b0a..4fc92e3e3b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -824,8 +824,8 @@ class JsonSuite extends QueryTest {
df1.registerTempTable("applySchema1")
val df2 = df1.toDataFrame
val result = df2.toJSON.collect()
- assert(result(0) == "{\"f1\":1,\"f2\":\"A1\",\"f3\":true,\"f4\":[\"1\",\" A1\",\" true\",\" null\"]}")
- assert(result(3) == "{\"f1\":4,\"f2\":\"D4\",\"f3\":true,\"f4\":[\"4\",\" D4\",\" true\",\" 2147483644\"],\"f5\":2147483644}")
+ assert(result(0) === "{\"f1\":1,\"f2\":\"A1\",\"f3\":true,\"f4\":[\"1\",\" A1\",\" true\",\" null\"]}")
+ assert(result(3) === "{\"f1\":4,\"f2\":\"D4\",\"f3\":true,\"f4\":[\"4\",\" D4\",\" true\",\" 2147483644\"],\"f5\":2147483644}")
val schema2 = StructType(
StructField("f1", StructType(
@@ -846,8 +846,8 @@ class JsonSuite extends QueryTest {
val df4 = df3.toDataFrame
val result2 = df4.toJSON.collect()
- assert(result2(1) == "{\"f1\":{\"f11\":2,\"f12\":false},\"f2\":{\"B2\":null}}")
- assert(result2(3) == "{\"f1\":{\"f11\":4,\"f12\":true},\"f2\":{\"D4\":2147483644}}")
+ assert(result2(1) === "{\"f1\":{\"f11\":2,\"f12\":false},\"f2\":{\"B2\":null}}")
+ assert(result2(3) === "{\"f1\":{\"f11\":4,\"f12\":true},\"f2\":{\"D4\":2147483644}}")
val jsonDF = jsonRDD(primitiveFieldAndType)
val primTable = jsonRDD(jsonDF.toJSON)