aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala10
1 files changed, 4 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index 3ee2ea05cf..fbec2f9f4b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql
import java.util.{List => JList}
+import org.apache.spark.api.python.SerDeUtil
+
import scala.collection.JavaConversions._
import net.razorvine.pickle.Pickler
@@ -385,12 +387,8 @@ class SchemaRDD(
*/
private[sql] def javaToPython: JavaRDD[Array[Byte]] = {
val fieldTypes = schema.fields.map(_.dataType)
- this.mapPartitions { iter =>
- val pickle = new Pickler
- iter.map { row =>
- EvaluatePython.rowToArray(row, fieldTypes)
- }.grouped(100).map(batched => pickle.dumps(batched.toArray))
- }
+ val jrdd = this.map(EvaluatePython.rowToArray(_, fieldTypes)).toJavaRDD()
+ SerDeUtil.javaToPython(jrdd)
}
/**