From 4cebb79c9f3067da0c533292de45d7ecf56f2ff2 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 23 Jan 2014 20:01:36 -0800 Subject: Deprecate mapPartitionsWithSplit in PySpark. Also, replace the last reference to it in the docs. This fixes SPARK-1026. --- python/pyspark/rdd.py | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) (limited to 'python/pyspark/rdd.py') diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 6fb4a7b3be..1ad4b52987 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -27,6 +27,7 @@ import traceback from subprocess import Popen, PIPE from tempfile import NamedTemporaryFile from threading import Thread +import warnings from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ BatchedSerializer, CloudPickleSerializer, pack_long @@ -179,7 +180,7 @@ class RDD(object): [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)] """ def func(s, iterator): return chain.from_iterable(imap(f, iterator)) - return self.mapPartitionsWithSplit(func, preservesPartitioning) + return self.mapPartitionsWithIndex(func, preservesPartitioning) def mapPartitions(self, f, preservesPartitioning=False): """ @@ -191,10 +192,24 @@ class RDD(object): [3, 7] """ def func(s, iterator): return f(iterator) - return self.mapPartitionsWithSplit(func) + return self.mapPartitionsWithIndex(func) + + def mapPartitionsWithIndex(self, f, preservesPartitioning=False): + """ + Return a new RDD by applying a function to each partition of this RDD, + while tracking the index of the original partition. + + >>> rdd = sc.parallelize([1, 2, 3, 4], 4) + >>> def f(splitIndex, iterator): yield splitIndex + >>> rdd.mapPartitionsWithIndex(f).sum() + 6 + """ + return PipelinedRDD(self, f, preservesPartitioning) def mapPartitionsWithSplit(self, f, preservesPartitioning=False): """ + Deprecated: use mapPartitionsWithIndex instead. + Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition. @@ -203,7 +218,9 @@ class RDD(object): >>> rdd.mapPartitionsWithSplit(f).sum() 6 """ - return PipelinedRDD(self, f, preservesPartitioning) + warnings.warn("mapPartitionsWithSplit is deprecated; " + "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2) + return self.mapPartitionsWithIndex(f, preservesPartitioning) def filter(self, f): """ @@ -235,7 +252,7 @@ class RDD(object): >>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP [2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98] """ - return self.mapPartitionsWithSplit(RDDSampler(withReplacement, fraction, seed).func, True) + return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True) # this is ported from scala/spark/RDD.scala def takeSample(self, withReplacement, num, seed): -- cgit v1.2.3