diff options
author | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2013-01-08 16:04:41 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2013-01-08 16:05:02 -0800 |
commit | b57dd0f16024a82dfc223e69528b9908b931f068 (patch) | |
tree | 8ad1222593da58eaeb7746aecaef2c41c5313f71 | |
parent | 33beba39656fc64984db09a82fc69ca4edcc02d4 (diff) | |
download | spark-b57dd0f16024a82dfc223e69528b9908b931f068.tar.gz spark-b57dd0f16024a82dfc223e69528b9908b931f068.tar.bz2 spark-b57dd0f16024a82dfc223e69528b9908b931f068.zip |
Add mapPartitionsWithSplit() to PySpark.
-rw-r--r-- | core/src/main/scala/spark/api/python/PythonRDD.scala | 5 | ||||
-rw-r--r-- | docs/python-programming-guide.md | 1 | ||||
-rw-r--r-- | python/pyspark/rdd.py | 33 | ||||
-rw-r--r-- | python/pyspark/worker.py | 4 |
4 files changed, 30 insertions, 13 deletions
diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala index 79d824d494..f431ef28d3 100644 --- a/core/src/main/scala/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/spark/api/python/PythonRDD.scala @@ -65,6 +65,9 @@ private[spark] class PythonRDD[T: ClassManifest]( SparkEnv.set(env) val out = new PrintWriter(proc.getOutputStream) val dOut = new DataOutputStream(proc.getOutputStream) + // Split index + dOut.writeInt(split.index) + // Broadcast variables dOut.writeInt(broadcastVars.length) for (broadcast <- broadcastVars) { dOut.writeLong(broadcast.id) @@ -72,10 +75,12 @@ private[spark] class PythonRDD[T: ClassManifest]( dOut.write(broadcast.value) dOut.flush() } + // Serialized user code for (elem <- command) { out.println(elem) } out.flush() + // Data values for (elem <- parent.iterator(split, context)) { PythonRDD.writeAsPickle(elem, dOut) } diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md index d963551296..78ef310a00 100644 --- a/docs/python-programming-guide.md +++ b/docs/python-programming-guide.md @@ -19,7 +19,6 @@ There are a few key differences between the Python and Scala APIs: - Accumulators - Special functions on RDDs of doubles, such as `mean` and `stdev` - `lookup` - - `mapPartitionsWithSplit` - `persist` at storage levels other than `MEMORY_ONLY` - `sample` - `sort` diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 4ba417b2a2..1d36da42b0 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -55,7 +55,7 @@ class RDD(object): """ Return a new RDD containing the distinct elements in this RDD. """ - def func(iterator): return imap(f, iterator) + def func(split, iterator): return imap(f, iterator) return PipelinedRDD(self, func, preservesPartitioning) def flatMap(self, f, preservesPartitioning=False): @@ -69,8 +69,8 @@ class RDD(object): >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect()) [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)] """ - def func(iterator): return chain.from_iterable(imap(f, iterator)) - return self.mapPartitions(func, preservesPartitioning) + def func(s, iterator): return chain.from_iterable(imap(f, iterator)) + return self.mapPartitionsWithSplit(func, preservesPartitioning) def mapPartitions(self, f, preservesPartitioning=False): """ @@ -81,9 +81,20 @@ class RDD(object): >>> rdd.mapPartitions(f).collect() [3, 7] """ - return PipelinedRDD(self, f, preservesPartitioning) + def func(s, iterator): return f(iterator) + return self.mapPartitionsWithSplit(func) + + def mapPartitionsWithSplit(self, f, preservesPartitioning=False): + """ + Return a new RDD by applying a function to each partition of this RDD, + while tracking the index of the original partition. - # TODO: mapPartitionsWithSplit + >>> rdd = sc.parallelize([1, 2, 3, 4], 4) + >>> def f(splitIndex, iterator): yield splitIndex + >>> rdd.mapPartitionsWithSplit(f).sum() + 6 + """ + return PipelinedRDD(self, f, preservesPartitioning) def filter(self, f): """ @@ -362,7 +373,7 @@ class RDD(object): >>> ''.join(input(glob(tempFile.name + "/part-0000*"))) '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n' """ - def func(iterator): + def func(split, iterator): return (str(x).encode("utf-8") for x in iterator) keyed = PipelinedRDD(self, func) keyed._bypass_serializer = True @@ -500,7 +511,7 @@ class RDD(object): # Transferring O(n) objects to Java is too expensive. Instead, we'll # form the hash buckets in Python, transferring O(numSplits) objects # to Java. Each object is a (splitNumber, [objects]) pair. - def add_shuffle_key(iterator): + def add_shuffle_key(split, iterator): buckets = defaultdict(list) for (k, v) in iterator: buckets[hashFunc(k) % numSplits].append((k, v)) @@ -653,8 +664,8 @@ class PipelinedRDD(RDD): def __init__(self, prev, func, preservesPartitioning=False): if isinstance(prev, PipelinedRDD) and not prev.is_cached: prev_func = prev.func - def pipeline_func(iterator): - return func(prev_func(iterator)) + def pipeline_func(split, iterator): + return func(split, prev_func(split, iterator)) self.func = pipeline_func self.preservesPartitioning = \ prev.preservesPartitioning and preservesPartitioning @@ -677,8 +688,8 @@ class PipelinedRDD(RDD): if not self._bypass_serializer and self.ctx.batchSize != 1: oldfunc = self.func batchSize = self.ctx.batchSize - def batched_func(iterator): - return batched(oldfunc(iterator), batchSize) + def batched_func(split, iterator): + return batched(oldfunc(split, iterator), batchSize) func = batched_func cmds = [func, self._bypass_serializer] pipe_command = ' '.join(b64enc(cloudpickle.dumps(f)) for f in cmds) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 9f6b507dbd..3d792bbaa2 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -21,6 +21,7 @@ def load_obj(): def main(): + split_index = read_int(sys.stdin) num_broadcast_variables = read_int(sys.stdin) for _ in range(num_broadcast_variables): bid = read_long(sys.stdin) @@ -32,7 +33,8 @@ def main(): dumps = lambda x: x else: dumps = dump_pickle - for obj in func(read_from_pickle_file(sys.stdin)): + iterator = read_from_pickle_file(sys.stdin) + for obj in func(split_index, iterator): write_with_length(dumps(obj), old_stdout) |