diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/lib/py4j0.7.jar | bin | 103286 -> 0 bytes | |||
-rw-r--r-- | python/pyspark/java_gateway.py | 2 | ||||
-rw-r--r-- | python/pyspark/rdd.py | 37 |
3 files changed, 38 insertions, 1 deletions
diff --git a/python/lib/py4j0.7.jar b/python/lib/py4j0.7.jar Binary files differdeleted file mode 100644 index 73b7ddb7d1..0000000000 --- a/python/lib/py4j0.7.jar +++ /dev/null diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index fdc9851479..3ccf062c86 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -29,7 +29,7 @@ SPARK_HOME = os.environ["SPARK_HOME"] def launch_gateway(): # Launch the Py4j gateway using Spark's run command so that we pick up the # proper classpath and SPARK_MEM settings from spark-env.sh - command = [os.path.join(SPARK_HOME, "run"), "py4j.GatewayServer", + command = [os.path.join(SPARK_HOME, "spark-class"), "py4j.GatewayServer", "--die-on-broken-pipe", "0"] # Don't send ctrl-c / SIGINT to the Java gateway: def preexec_function(): diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 1e9b3bb5c0..dfc518a7b0 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -754,6 +754,43 @@ class RDD(object): """ return python_cogroup(self, other, numPartitions) + def subtractByKey(self, other, numPartitions=None): + """ + Return each (key, value) pair in C{self} that has no pair with matching key + in C{other}. + + >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)]) + >>> y = sc.parallelize([("a", 3), ("c", None)]) + >>> sorted(x.subtractByKey(y).collect()) + [('b', 4), ('b', 5)] + """ + filter_func = lambda tpl: len(tpl[1][0]) > 0 and len(tpl[1][1]) == 0 + map_func = lambda tpl: [(tpl[0], val) for val in tpl[1][0]] + return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func) + + def subtract(self, other, numPartitions=None): + """ + Return each value in C{self} that is not contained in C{other}. + + >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)]) + >>> y = sc.parallelize([("a", 3), ("c", None)]) + >>> sorted(x.subtract(y).collect()) + [('a', 1), ('b', 4), ('b', 5)] + """ + rdd = other.map(lambda x: (x, True)) # note: here 'True' is just a placeholder + return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0]) # note: here 'True' is just a placeholder + + def keyBy(self, f): + """ + Creates tuples of the elements in this RDD by applying C{f}. + + >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x) + >>> y = sc.parallelize(zip(range(0,5), range(0,5))) + >>> sorted(x.cogroup(y).collect()) + [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))] + """ + return self.map(lambda x: (f(x), x)) + # TODO: `lookup` is disabled because we can't make direct comparisons based # on the key; we need to compare the hash of the key to the hash of the # keys in the pairs. This could be an expensive operation, since those |