aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2013-12-19 13:35:09 -0800
committerReynold Xin <rxin@apache.org>2013-12-19 13:35:09 -0800
commit7990c5637519ae2def30dfba19b7c83562c0ec00 (patch)
tree0c645fee598e3a9da68e5e8bb88bcafef651f04b /python
parent440e531a5e7720c42f0c53ce98425b63b4194b7b (diff)
parent9cc3a6d3c0a64b80af77ae358c58d4b29b18c534 (diff)
downloadspark-7990c5637519ae2def30dfba19b7c83562c0ec00.tar.gz
spark-7990c5637519ae2def30dfba19b7c83562c0ec00.tar.bz2
spark-7990c5637519ae2def30dfba19b7c83562c0ec00.zip
Merge pull request #276 from shivaram/collectPartition
Add collectPartition to JavaRDD interface. This interface is useful for implementing `take` from other language frontends where the data is serialized. Also remove `takePartition` from PythonRDD and use `collectPartition` in rdd.py. Thanks @concretevitamin for the original change and tests.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/context.py3
-rw-r--r--python/pyspark/rdd.py7
2 files changed, 6 insertions, 4 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index cbd41e58c4..0604f6836c 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -43,7 +43,6 @@ class SparkContext(object):
_gateway = None
_jvm = None
_writeToFile = None
- _takePartition = None
_next_accum_id = 0
_active_spark_context = None
_lock = Lock()
@@ -134,8 +133,6 @@ class SparkContext(object):
SparkContext._jvm = SparkContext._gateway.jvm
SparkContext._writeToFile = \
SparkContext._jvm.PythonRDD.writeToFile
- SparkContext._takePartition = \
- SparkContext._jvm.PythonRDD.takePartition
if instance:
if SparkContext._active_spark_context and SparkContext._active_spark_context != instance:
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 7cbc66d3c9..f87923e6fa 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -579,8 +579,13 @@ class RDD(object):
# Take only up to num elements from each partition we try
mapped = self.mapPartitions(takeUpToNum)
items = []
+ # TODO(shivaram): Similar to the scala implementation, update the take
+ # method to scan multiple splits based on an estimate of how many elements
+ # we have per-split.
for partition in range(mapped._jrdd.splits().size()):
- iterator = self.ctx._takePartition(mapped._jrdd.rdd(), partition)
+ partitionsToTake = self.ctx._gateway.new_array(self.ctx._jvm.int, 1)
+ partitionsToTake[0] = partition
+ iterator = mapped._jrdd.collectPartitions(partitionsToTake)[0].iterator()
items.extend(mapped._collect_iterator_through_file(iterator))
if len(items) >= num:
break