diff options
author | Davies Liu <davies@databricks.com> | 2015-02-17 16:54:57 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2015-02-17 16:54:57 -0800 |
commit | c3d2b90bde2e11823909605d518167548df66bd8 (patch) | |
tree | eab646a984d8c91b533789fc07fea1221cfe6460 /python/pyspark/streaming | |
parent | 117121a4ecaadda156a82255333670775e7727db (diff) | |
download | spark-c3d2b90bde2e11823909605d518167548df66bd8.tar.gz spark-c3d2b90bde2e11823909605d518167548df66bd8.tar.bz2 spark-c3d2b90bde2e11823909605d518167548df66bd8.zip |
[SPARK-5785] [PySpark] narrow dependency for cogroup/join in PySpark
Currently, PySpark does not support narrow dependency during cogroup/join when the two RDDs have the partitioner, another unnecessary shuffle stage will come in.
The Python implementation of cogroup/join is different than Scala one, it depends on union() and partitionBy(). This patch will try to use PartitionerAwareUnionRDD() in union(), when all the RDDs have the same partitioner. It also fix `reservePartitioner` in all the map() or mapPartitions(), then partitionBy() can skip the unnecessary shuffle stage.
Author: Davies Liu <davies@databricks.com>
Closes #4629 from davies/narrow and squashes the following commits:
dffe34e [Davies Liu] improve test, check number of stages for join/cogroup
1ed3ba2 [Davies Liu] Merge branch 'master' of github.com:apache/spark into narrow
4d29932 [Davies Liu] address comment
cc28d97 [Davies Liu] add unit tests
940245e [Davies Liu] address comments
ff5a0a6 [Davies Liu] skip the partitionBy() on Python side
eb26c62 [Davies Liu] narrow dependency in PySpark
Diffstat (limited to 'python/pyspark/streaming')
-rw-r--r-- | python/pyspark/streaming/dstream.py | 2 |
1 files changed, 1 insertions, 1 deletions
diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 2fe39392ff..3fa4244423 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -578,7 +578,7 @@ class DStream(object): if a is None: g = b.groupByKey(numPartitions).mapValues(lambda vs: (list(vs), None)) else: - g = a.cogroup(b, numPartitions) + g = a.cogroup(b.partitionBy(numPartitions), numPartitions) g = g.mapValues(lambda (va, vb): (list(vb), list(va)[0] if len(va) else None)) state = g.mapValues(lambda (vs, s): updateFunc(vs, s)) return state.filter(lambda (k, v): v is not None) |