diff options
author | Reynold Xin <rxin@apache.org> | 2013-12-19 13:35:09 -0800 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2013-12-19 13:35:09 -0800 |
commit | 7990c5637519ae2def30dfba19b7c83562c0ec00 (patch) | |
tree | 0c645fee598e3a9da68e5e8bb88bcafef651f04b /python/pyspark | |
parent | 440e531a5e7720c42f0c53ce98425b63b4194b7b (diff) | |
parent | 9cc3a6d3c0a64b80af77ae358c58d4b29b18c534 (diff) | |
download | spark-7990c5637519ae2def30dfba19b7c83562c0ec00.tar.gz spark-7990c5637519ae2def30dfba19b7c83562c0ec00.tar.bz2 spark-7990c5637519ae2def30dfba19b7c83562c0ec00.zip |
Merge pull request #276 from shivaram/collectPartition
Add collectPartition to JavaRDD interface.
This interface is useful for implementing `take` from other language frontends where the data is serialized. Also remove `takePartition` from PythonRDD and use `collectPartition` in rdd.py.
Thanks @concretevitamin for the original change and tests.
Diffstat (limited to 'python/pyspark')
-rw-r--r-- | python/pyspark/context.py | 3 | ||||
-rw-r--r-- | python/pyspark/rdd.py | 7 |
2 files changed, 6 insertions, 4 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py index cbd41e58c4..0604f6836c 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -43,7 +43,6 @@ class SparkContext(object): _gateway = None _jvm = None _writeToFile = None - _takePartition = None _next_accum_id = 0 _active_spark_context = None _lock = Lock() @@ -134,8 +133,6 @@ class SparkContext(object): SparkContext._jvm = SparkContext._gateway.jvm SparkContext._writeToFile = \ SparkContext._jvm.PythonRDD.writeToFile - SparkContext._takePartition = \ - SparkContext._jvm.PythonRDD.takePartition if instance: if SparkContext._active_spark_context and SparkContext._active_spark_context != instance: diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 7cbc66d3c9..f87923e6fa 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -579,8 +579,13 @@ class RDD(object): # Take only up to num elements from each partition we try mapped = self.mapPartitions(takeUpToNum) items = [] + # TODO(shivaram): Similar to the scala implementation, update the take + # method to scan multiple splits based on an estimate of how many elements + # we have per-split. for partition in range(mapped._jrdd.splits().size()): - iterator = self.ctx._takePartition(mapped._jrdd.rdd(), partition) + partitionsToTake = self.ctx._gateway.new_array(self.ctx._jvm.int, 1) + partitionsToTake[0] = partition + iterator = mapped._jrdd.collectPartitions(partitionsToTake)[0].iterator() items.extend(mapped._collect_iterator_through_file(iterator)) if len(items) >= num: break |