diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/rdd.py | 8 | ||||
-rw-r--r-- | python/pyspark/sql/dataframe.py | 14 |
2 files changed, 18 insertions, 4 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 37574cea0b..cd1f64e8aa 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2299,14 +2299,14 @@ class RDD(object): """ Return an iterator that contains all of the elements in this RDD. The iterator will consume as much memory as the largest partition in this RDD. + >>> rdd = sc.parallelize(range(10)) >>> [x for x in rdd.toLocalIterator()] [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] """ - for partition in range(self.getNumPartitions()): - rows = self.context.runJob(self, lambda x: x, [partition]) - for row in rows: - yield row + with SCCallSiteSync(self.context) as css: + port = self.ctx._jvm.PythonRDD.toLocalIteratorAndServe(self._jrdd.rdd()) + return _load_from_socket(port, self._jrdd_deserializer) def _prepare_for_python_RDD(sc, command): diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 7a69c4c70c..d473d6b534 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -241,6 +241,20 @@ class DataFrame(object): return list(_load_from_socket(port, BatchedSerializer(PickleSerializer()))) @ignore_unicode_prefix + @since(2.0) + def toLocalIterator(self): + """ + Returns an iterator that contains all of the rows in this :class:`DataFrame`. + The iterator will consume as much memory as the largest partition in this DataFrame. + + >>> list(df.toLocalIterator()) + [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')] + """ + with SCCallSiteSync(self._sc) as css: + port = self._jdf.toPythonIterator() + return _load_from_socket(port, BatchedSerializer(PickleSerializer())) + + @ignore_unicode_prefix @since(1.3) def limit(self, num): """Limits the result count to the number specified. |