aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py99
1 files changed, 92 insertions, 7 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 1e9b3bb5c0..914118ccdd 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -21,6 +21,7 @@ from collections import defaultdict
from itertools import chain, ifilter, imap, product
import operator
import os
+import sys
import shlex
from subprocess import Popen, PIPE
from tempfile import NamedTemporaryFile
@@ -32,6 +33,7 @@ from pyspark.serializers import batched, Batch, dump_pickle, load_pickle, \
from pyspark.join import python_join, python_left_outer_join, \
python_right_outer_join, python_cogroup
from pyspark.statcounter import StatCounter
+from pyspark.rddsampler import RDDSampler
from py4j.java_collections import ListConverter, MapConverter
@@ -165,14 +167,60 @@ class RDD(object):
.reduceByKey(lambda x, _: x) \
.map(lambda (x, _): x)
- # TODO: sampling needs to be re-implemented due to Batch
- #def sample(self, withReplacement, fraction, seed):
- # jrdd = self._jrdd.sample(withReplacement, fraction, seed)
- # return RDD(jrdd, self.ctx)
+ def sample(self, withReplacement, fraction, seed):
+ """
+ Return a sampled subset of this RDD (relies on numpy and falls back
+ on default random generator if numpy is unavailable).
+
+ >>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP
+ [2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98]
+ """
+ return self.mapPartitionsWithSplit(RDDSampler(withReplacement, fraction, seed).func, True)
+
+ # this is ported from scala/spark/RDD.scala
+ def takeSample(self, withReplacement, num, seed):
+ """
+ Return a fixed-size sampled subset of this RDD (currently requires numpy).
+
+ >>> sc.parallelize(range(0, 10)).takeSample(True, 10, 1) #doctest: +SKIP
+ [4, 2, 1, 8, 2, 7, 0, 4, 1, 4]
+ """
+
+ fraction = 0.0
+ total = 0
+ multiplier = 3.0
+ initialCount = self.count()
+ maxSelected = 0
+
+ if (num < 0):
+ raise ValueError
+
+ if initialCount > sys.maxint - 1:
+ maxSelected = sys.maxint - 1
+ else:
+ maxSelected = initialCount
+
+ if num > initialCount and not withReplacement:
+ total = maxSelected
+ fraction = multiplier * (maxSelected + 1) / initialCount
+ else:
+ fraction = multiplier * (num + 1) / initialCount
+ total = num
- #def takeSample(self, withReplacement, num, seed):
- # vals = self._jrdd.takeSample(withReplacement, num, seed)
- # return [load_pickle(bytes(x)) for x in vals]
+ samples = self.sample(withReplacement, fraction, seed).collect()
+
+ # If the first sample didn't turn out large enough, keep trying to take samples;
+ # this shouldn't happen often because we use a big multiplier for their initial size.
+ # See: scala/spark/RDD.scala
+ while len(samples) < total:
+ if seed > sys.maxint - 2:
+ seed = -1
+ seed += 1
+ samples = self.sample(withReplacement, fraction, seed).collect()
+
+ sampler = RDDSampler(withReplacement, fraction, seed+1)
+ sampler.shuffle(samples)
+ return samples[0:total]
def union(self, other):
"""
@@ -754,6 +802,43 @@ class RDD(object):
"""
return python_cogroup(self, other, numPartitions)
+ def subtractByKey(self, other, numPartitions=None):
+ """
+ Return each (key, value) pair in C{self} that has no pair with matching key
+ in C{other}.
+
+ >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)])
+ >>> y = sc.parallelize([("a", 3), ("c", None)])
+ >>> sorted(x.subtractByKey(y).collect())
+ [('b', 4), ('b', 5)]
+ """
+ filter_func = lambda tpl: len(tpl[1][0]) > 0 and len(tpl[1][1]) == 0
+ map_func = lambda tpl: [(tpl[0], val) for val in tpl[1][0]]
+ return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func)
+
+ def subtract(self, other, numPartitions=None):
+ """
+ Return each value in C{self} that is not contained in C{other}.
+
+ >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
+ >>> y = sc.parallelize([("a", 3), ("c", None)])
+ >>> sorted(x.subtract(y).collect())
+ [('a', 1), ('b', 4), ('b', 5)]
+ """
+ rdd = other.map(lambda x: (x, True)) # note: here 'True' is just a placeholder
+ return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0]) # note: here 'True' is just a placeholder
+
+ def keyBy(self, f):
+ """
+ Creates tuples of the elements in this RDD by applying C{f}.
+
+ >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x)
+ >>> y = sc.parallelize(zip(range(0,5), range(0,5)))
+ >>> sorted(x.cogroup(y).collect())
+ [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))]
+ """
+ return self.map(lambda x: (f(x), x))
+
# TODO: `lookup` is disabled because we can't make direct comparisons based
# on the key; we need to compare the hash of the key to the hash of the
# keys in the pairs. This could be an expensive operation, since those