aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKan Zhang <kzhang@apache.org>2014-06-14 13:17:22 -0700
committerReynold Xin <rxin@apache.org>2014-06-14 13:17:22 -0700
commit2550533a28382664f8fd294b2caa494d12bfc7c1 (patch)
tree6b68fa24234486ca8b9d130d39d515a5b64e5d5f
parent891968509105d8d8cf5a608ad9473aeeed747089 (diff)
downloadspark-2550533a28382664f8fd294b2caa494d12bfc7c1.tar.gz
spark-2550533a28382664f8fd294b2caa494d12bfc7c1.tar.bz2
spark-2550533a28382664f8fd294b2caa494d12bfc7c1.zip
[SPARK-2079] Support batching when serializing SchemaRDD to Python
Added batching with default batch size 10 in SchemaRDD.javaToPython Author: Kan Zhang <kzhang@apache.org> Closes #1023 from kanzhang/SPARK-2079 and squashes the following commits: 2d1915e [Kan Zhang] [SPARK-2079] Add batching in SchemaRDD.javaToPython 19b0c09 [Kan Zhang] [SPARK-2079] Removing unnecessary wrapping in SchemaRDD.javaToPython
-rw-r--r--python/pyspark/sql.py4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala9
2 files changed, 5 insertions, 8 deletions
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index 960d0a8244..e344610b1f 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -16,6 +16,7 @@
#
from pyspark.rdd import RDD
+from pyspark.serializers import BatchedSerializer, PickleSerializer
from py4j.protocol import Py4JError
@@ -346,7 +347,8 @@ class SchemaRDD(RDD):
# TODO: This is inefficient, we should construct the Python Row object
# in Java land in the javaToPython function. May require a custom
# pickle serializer in Pyrolite
- return RDD(jrdd, self._sc, self._sc.serializer).map(lambda d: Row(d))
+ return RDD(jrdd, self._sc, BatchedSerializer(
+ PickleSerializer())).map(lambda d: Row(d))
# We override the default cache/persist/checkpoint behavior as we want to cache the underlying
# SchemaRDD object in the JVM, not the PythonRDD checkpointed by the super class
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index 821ac850ac..89eaba2d19 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -347,16 +347,11 @@ class SchemaRDD(
val pickle = new Pickler
iter.map { row =>
val map: JMap[String, Any] = new java.util.HashMap
- // TODO: We place the map in an ArrayList so that the object is pickled to a List[Dict].
- // Ideally we should be able to pickle an object directly into a Python collection so we
- // don't have to create an ArrayList every time.
- val arr: java.util.ArrayList[Any] = new java.util.ArrayList
row.zip(fieldNames).foreach { case (obj, name) =>
map.put(name, obj)
}
- arr.add(map)
- pickle.dumps(arr)
- }
+ map
+ }.grouped(10).map(batched => pickle.dumps(batched.toArray))
}
}