diff options
author | Davies Liu <davies@databricks.com> | 2014-11-03 23:56:14 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2014-11-03 23:56:14 -0800 |
commit | e4f42631a68b473ce706429915f3f08042af2119 (patch) | |
tree | 557ff754b9936addfb9628bfcba462802ff6ec1c /sql/core | |
parent | b671ce047d036b8923007902826038b01e836e8a (diff) | |
download | spark-e4f42631a68b473ce706429915f3f08042af2119.tar.gz spark-e4f42631a68b473ce706429915f3f08042af2119.tar.bz2 spark-e4f42631a68b473ce706429915f3f08042af2119.zip |
[SPARK-3886] [PySpark] simplify serializer, use AutoBatchedSerializer by default.
This PR simplify serializer, always use batched serializer (AutoBatchedSerializer as default), even batch size is 1.
Author: Davies Liu <davies@databricks.com>
This patch had conflicts when merged, resolved by
Committer: Josh Rosen <joshrosen@databricks.com>
Closes #2920 from davies/fix_autobatch and squashes the following commits:
e544ef9 [Davies Liu] revert unrelated change
6880b14 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch
1d557fc [Davies Liu] fix tests
8180907 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch
76abdce [Davies Liu] clean up
53fa60b [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch
d7ac751 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch
2cc2497 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch
b4292ce [Davies Liu] fix bug in master
d79744c [Davies Liu] recover hive tests
be37ece [Davies Liu] refactor
eb3938d [Davies Liu] refactor serializer in scala
8d77ef2 [Davies Liu] simplify serializer, use AutoBatchedSerializer by default.
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala | 10 |
1 files changed, 4 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index 3ee2ea05cf..fbec2f9f4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql import java.util.{List => JList} +import org.apache.spark.api.python.SerDeUtil + import scala.collection.JavaConversions._ import net.razorvine.pickle.Pickler @@ -385,12 +387,8 @@ class SchemaRDD( */ private[sql] def javaToPython: JavaRDD[Array[Byte]] = { val fieldTypes = schema.fields.map(_.dataType) - this.mapPartitions { iter => - val pickle = new Pickler - iter.map { row => - EvaluatePython.rowToArray(row, fieldTypes) - }.grouped(100).map(batched => pickle.dumps(batched.toArray)) - } + val jrdd = this.map(EvaluatePython.rowToArray(_, fieldTypes)).toJavaRDD() + SerDeUtil.javaToPython(jrdd) } /** |