aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-12-24 11:38:17 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-12-24 11:38:17 -0800
commit9f79fd89dc84cda7ebeb98a0b43c8e982fefa787 (patch)
treea83e0237d6f3a8745924b05332e25d5772aa7d60 /python
parent0af7f84c8eb631cd2e427b692f407ec2d37dad64 (diff)
parentd63856c361cf47b1a508397ee9de38a7b5899fa0 (diff)
downloadspark-9f79fd89dc84cda7ebeb98a0b43c8e982fefa787.tar.gz
spark-9f79fd89dc84cda7ebeb98a0b43c8e982fefa787.tar.bz2
spark-9f79fd89dc84cda7ebeb98a0b43c8e982fefa787.zip
Merge branch 'apache-master' into filestream-fix
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/context.py3
-rw-r--r--python/pyspark/rdd.py10
2 files changed, 9 insertions, 4 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index cbd41e58c4..0604f6836c 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -43,7 +43,6 @@ class SparkContext(object):
_gateway = None
_jvm = None
_writeToFile = None
- _takePartition = None
_next_accum_id = 0
_active_spark_context = None
_lock = Lock()
@@ -134,8 +133,6 @@ class SparkContext(object):
SparkContext._jvm = SparkContext._gateway.jvm
SparkContext._writeToFile = \
SparkContext._jvm.PythonRDD.writeToFile
- SparkContext._takePartition = \
- SparkContext._jvm.PythonRDD.takePartition
if instance:
if SparkContext._active_spark_context and SparkContext._active_spark_context != instance:
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 61720dcf1a..f87923e6fa 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -54,6 +54,9 @@ class RDD(object):
self.ctx = ctx
self._jrdd_deserializer = jrdd_deserializer
+ def __repr__(self):
+ return self._jrdd.toString()
+
@property
def context(self):
"""
@@ -576,8 +579,13 @@ class RDD(object):
# Take only up to num elements from each partition we try
mapped = self.mapPartitions(takeUpToNum)
items = []
+ # TODO(shivaram): Similar to the scala implementation, update the take
+ # method to scan multiple splits based on an estimate of how many elements
+ # we have per-split.
for partition in range(mapped._jrdd.splits().size()):
- iterator = self.ctx._takePartition(mapped._jrdd.rdd(), partition)
+ partitionsToTake = self.ctx._gateway.new_array(self.ctx._jvm.int, 1)
+ partitionsToTake[0] = partition
+ iterator = mapped._jrdd.collectPartitions(partitionsToTake)[0].iterator()
items.extend(mapped._collect_iterator_through_file(iterator))
if len(items) >= num:
break