diff options
author | Patrick Wendell <pwendell@gmail.com> | 2014-01-23 20:53:18 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-01-23 20:53:18 -0800 |
commit | 05be7047744c88e64e7e6bd973f9bcfacd00da5f (patch) | |
tree | 1ebdf3d92388ac4d2cfdcc9b3ab533b30b68cd6c /python | |
parent | 3d6e75419330d27435becfdf8cfb0b6d20d56cf8 (diff) | |
parent | 4cebb79c9f3067da0c533292de45d7ecf56f2ff2 (diff) | |
download | spark-05be7047744c88e64e7e6bd973f9bcfacd00da5f.tar.gz spark-05be7047744c88e64e7e6bd973f9bcfacd00da5f.tar.bz2 spark-05be7047744c88e64e7e6bd973f9bcfacd00da5f.zip |
Merge pull request #505 from JoshRosen/SPARK-1026
Deprecate mapPartitionsWithSplit in PySpark (SPARK-1026)
This commit deprecates `mapPartitionsWithSplit` in PySpark (see [SPARK-1026](https://spark-project.atlassian.net/browse/SPARK-1026) and removes the remaining references to it from the docs.
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/rdd.py | 25 |
1 files changed, 21 insertions, 4 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 6fb4a7b3be..1ad4b52987 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -27,6 +27,7 @@ import traceback from subprocess import Popen, PIPE from tempfile import NamedTemporaryFile from threading import Thread +import warnings from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ BatchedSerializer, CloudPickleSerializer, pack_long @@ -179,7 +180,7 @@ class RDD(object): [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)] """ def func(s, iterator): return chain.from_iterable(imap(f, iterator)) - return self.mapPartitionsWithSplit(func, preservesPartitioning) + return self.mapPartitionsWithIndex(func, preservesPartitioning) def mapPartitions(self, f, preservesPartitioning=False): """ @@ -191,10 +192,24 @@ class RDD(object): [3, 7] """ def func(s, iterator): return f(iterator) - return self.mapPartitionsWithSplit(func) + return self.mapPartitionsWithIndex(func) + + def mapPartitionsWithIndex(self, f, preservesPartitioning=False): + """ + Return a new RDD by applying a function to each partition of this RDD, + while tracking the index of the original partition. + + >>> rdd = sc.parallelize([1, 2, 3, 4], 4) + >>> def f(splitIndex, iterator): yield splitIndex + >>> rdd.mapPartitionsWithIndex(f).sum() + 6 + """ + return PipelinedRDD(self, f, preservesPartitioning) def mapPartitionsWithSplit(self, f, preservesPartitioning=False): """ + Deprecated: use mapPartitionsWithIndex instead. + Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition. @@ -203,7 +218,9 @@ class RDD(object): >>> rdd.mapPartitionsWithSplit(f).sum() 6 """ - return PipelinedRDD(self, f, preservesPartitioning) + warnings.warn("mapPartitionsWithSplit is deprecated; " + "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2) + return self.mapPartitionsWithIndex(f, preservesPartitioning) def filter(self, f): """ @@ -235,7 +252,7 @@ class RDD(object): >>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP [2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98] """ - return self.mapPartitionsWithSplit(RDDSampler(withReplacement, fraction, seed).func, True) + return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True) # this is ported from scala/spark/RDD.scala def takeSample(self, withReplacement, num, seed): |