diff options
author | Matei Zaharia <matei.zaharia@gmail.com> | 2013-08-30 18:17:13 -0700 |
---|---|---|
committer | Matei Zaharia <matei.zaharia@gmail.com> | 2013-08-30 18:17:13 -0700 |
commit | 618f0ecb43fc0b9f36df0626465999f8685b310c (patch) | |
tree | 2930a8b88918e58be05d3da146581c9d667e7cfa | |
parent | 94bb7fd46e5586e1d08a99d21eecef93eeb4b97c (diff) | |
parent | 457bcd33436c91a6ef07591837ea048bb4bbcede (diff) | |
download | spark-618f0ecb43fc0b9f36df0626465999f8685b310c.tar.gz spark-618f0ecb43fc0b9f36df0626465999f8685b310c.tar.bz2 spark-618f0ecb43fc0b9f36df0626465999f8685b310c.zip |
Merge pull request #869 from AndreSchumacher/subtract
PySpark: implementing subtractByKey(), subtract() and keyBy()
-rw-r--r-- | python/pyspark/rdd.py | 37 |
1 files changed, 37 insertions, 0 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 1e9b3bb5c0..dfc518a7b0 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -754,6 +754,43 @@ class RDD(object): """ return python_cogroup(self, other, numPartitions) + def subtractByKey(self, other, numPartitions=None): + """ + Return each (key, value) pair in C{self} that has no pair with matching key + in C{other}. + + >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)]) + >>> y = sc.parallelize([("a", 3), ("c", None)]) + >>> sorted(x.subtractByKey(y).collect()) + [('b', 4), ('b', 5)] + """ + filter_func = lambda tpl: len(tpl[1][0]) > 0 and len(tpl[1][1]) == 0 + map_func = lambda tpl: [(tpl[0], val) for val in tpl[1][0]] + return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func) + + def subtract(self, other, numPartitions=None): + """ + Return each value in C{self} that is not contained in C{other}. + + >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)]) + >>> y = sc.parallelize([("a", 3), ("c", None)]) + >>> sorted(x.subtract(y).collect()) + [('a', 1), ('b', 4), ('b', 5)] + """ + rdd = other.map(lambda x: (x, True)) # note: here 'True' is just a placeholder + return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0]) # note: here 'True' is just a placeholder + + def keyBy(self, f): + """ + Creates tuples of the elements in this RDD by applying C{f}. + + >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x) + >>> y = sc.parallelize(zip(range(0,5), range(0,5))) + >>> sorted(x.cogroup(y).collect()) + [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))] + """ + return self.map(lambda x: (f(x), x)) + # TODO: `lookup` is disabled because we can't make direct comparisons based # on the key; we need to compare the hash of the key to the hash of the # keys in the pairs. This could be an expensive operation, since those |