aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@apache.org>2014-01-23 20:01:36 -0800
committerJosh Rosen <joshrosen@apache.org>2014-01-23 20:01:36 -0800
commit4cebb79c9f3067da0c533292de45d7ecf56f2ff2 (patch)
tree32af23ff5ae09d8cb92b3ef7d80878f2b2dfa8eb /python/pyspark/rdd.py
parentff44732171730fd9e5db005062a45464a3801358 (diff)
downloadspark-4cebb79c9f3067da0c533292de45d7ecf56f2ff2.tar.gz
spark-4cebb79c9f3067da0c533292de45d7ecf56f2ff2.tar.bz2
spark-4cebb79c9f3067da0c533292de45d7ecf56f2ff2.zip
Deprecate mapPartitionsWithSplit in PySpark.
Also, replace the last reference to it in the docs. This fixes SPARK-1026.
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py25
1 files changed, 21 insertions, 4 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 6fb4a7b3be..1ad4b52987 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -27,6 +27,7 @@ import traceback
from subprocess import Popen, PIPE
from tempfile import NamedTemporaryFile
from threading import Thread
+import warnings
from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
BatchedSerializer, CloudPickleSerializer, pack_long
@@ -179,7 +180,7 @@ class RDD(object):
[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
"""
def func(s, iterator): return chain.from_iterable(imap(f, iterator))
- return self.mapPartitionsWithSplit(func, preservesPartitioning)
+ return self.mapPartitionsWithIndex(func, preservesPartitioning)
def mapPartitions(self, f, preservesPartitioning=False):
"""
@@ -191,10 +192,24 @@ class RDD(object):
[3, 7]
"""
def func(s, iterator): return f(iterator)
- return self.mapPartitionsWithSplit(func)
+ return self.mapPartitionsWithIndex(func)
+
+ def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
+ """
+ Return a new RDD by applying a function to each partition of this RDD,
+ while tracking the index of the original partition.
+
+ >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
+ >>> def f(splitIndex, iterator): yield splitIndex
+ >>> rdd.mapPartitionsWithIndex(f).sum()
+ 6
+ """
+ return PipelinedRDD(self, f, preservesPartitioning)
def mapPartitionsWithSplit(self, f, preservesPartitioning=False):
"""
+ Deprecated: use mapPartitionsWithIndex instead.
+
Return a new RDD by applying a function to each partition of this RDD,
while tracking the index of the original partition.
@@ -203,7 +218,9 @@ class RDD(object):
>>> rdd.mapPartitionsWithSplit(f).sum()
6
"""
- return PipelinedRDD(self, f, preservesPartitioning)
+ warnings.warn("mapPartitionsWithSplit is deprecated; "
+ "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2)
+ return self.mapPartitionsWithIndex(f, preservesPartitioning)
def filter(self, f):
"""
@@ -235,7 +252,7 @@ class RDD(object):
>>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP
[2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98]
"""
- return self.mapPartitionsWithSplit(RDDSampler(withReplacement, fraction, seed).func, True)
+ return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)
# this is ported from scala/spark/RDD.scala
def takeSample(self, withReplacement, num, seed):