From af0cd6bd27dda73b326bcb6a66addceadebf5e54 Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Wed, 18 Dec 2013 11:40:07 -0800 Subject: Add collectPartition to JavaRDD interface. Also remove takePartition from PythonRDD and use collectPartition in rdd.py. --- python/pyspark/context.py | 3 --- python/pyspark/rdd.py | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) (limited to 'python') diff --git a/python/pyspark/context.py b/python/pyspark/context.py index cbd41e58c4..0604f6836c 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -43,7 +43,6 @@ class SparkContext(object): _gateway = None _jvm = None _writeToFile = None - _takePartition = None _next_accum_id = 0 _active_spark_context = None _lock = Lock() @@ -134,8 +133,6 @@ class SparkContext(object): SparkContext._jvm = SparkContext._gateway.jvm SparkContext._writeToFile = \ SparkContext._jvm.PythonRDD.writeToFile - SparkContext._takePartition = \ - SparkContext._jvm.PythonRDD.takePartition if instance: if SparkContext._active_spark_context and SparkContext._active_spark_context != instance: diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 61720dcf1a..d81b7c90c1 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -577,7 +577,7 @@ class RDD(object): mapped = self.mapPartitions(takeUpToNum) items = [] for partition in range(mapped._jrdd.splits().size()): - iterator = self.ctx._takePartition(mapped._jrdd.rdd(), partition) + iterator = mapped._jrdd.collectPartition(partition).iterator() items.extend(mapped._collect_iterator_through_file(iterator)) if len(items) >= num: break -- cgit v1.2.3