aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/lib/py4j0.7.jarbin103286 -> 0 bytes
-rw-r--r--python/pyspark/java_gateway.py2
-rw-r--r--python/pyspark/rdd.py37
3 files changed, 38 insertions, 1 deletions
diff --git a/python/lib/py4j0.7.jar b/python/lib/py4j0.7.jar
deleted file mode 100644
index 73b7ddb7d1..0000000000
--- a/python/lib/py4j0.7.jar
+++ /dev/null
Binary files differ
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index fdc9851479..3ccf062c86 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -29,7 +29,7 @@ SPARK_HOME = os.environ["SPARK_HOME"]
def launch_gateway():
# Launch the Py4j gateway using Spark's run command so that we pick up the
# proper classpath and SPARK_MEM settings from spark-env.sh
- command = [os.path.join(SPARK_HOME, "run"), "py4j.GatewayServer",
+ command = [os.path.join(SPARK_HOME, "spark-class"), "py4j.GatewayServer",
"--die-on-broken-pipe", "0"]
# Don't send ctrl-c / SIGINT to the Java gateway:
def preexec_function():
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 1e9b3bb5c0..dfc518a7b0 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -754,6 +754,43 @@ class RDD(object):
"""
return python_cogroup(self, other, numPartitions)
+ def subtractByKey(self, other, numPartitions=None):
+ """
+ Return each (key, value) pair in C{self} that has no pair with matching key
+ in C{other}.
+
+ >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)])
+ >>> y = sc.parallelize([("a", 3), ("c", None)])
+ >>> sorted(x.subtractByKey(y).collect())
+ [('b', 4), ('b', 5)]
+ """
+ filter_func = lambda tpl: len(tpl[1][0]) > 0 and len(tpl[1][1]) == 0
+ map_func = lambda tpl: [(tpl[0], val) for val in tpl[1][0]]
+ return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func)
+
+ def subtract(self, other, numPartitions=None):
+ """
+ Return each value in C{self} that is not contained in C{other}.
+
+ >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
+ >>> y = sc.parallelize([("a", 3), ("c", None)])
+ >>> sorted(x.subtract(y).collect())
+ [('a', 1), ('b', 4), ('b', 5)]
+ """
+ rdd = other.map(lambda x: (x, True)) # note: here 'True' is just a placeholder
+ return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0]) # note: here 'True' is just a placeholder
+
+ def keyBy(self, f):
+ """
+ Creates tuples of the elements in this RDD by applying C{f}.
+
+ >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x)
+ >>> y = sc.parallelize(zip(range(0,5), range(0,5)))
+ >>> sorted(x.cogroup(y).collect())
+ [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))]
+ """
+ return self.map(lambda x: (f(x), x))
+
# TODO: `lookup` is disabled because we can't make direct comparisons based
# on the key; we need to compare the hash of the key to the hash of the
# keys in the pairs. This could be an expensive operation, since those