aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--python/pyspark/rdd.py10
-rw-r--r--python/pyspark/rddsampler.py99
2 files changed, 40 insertions, 69 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 50535d2711..57754776fa 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -310,8 +310,11 @@ class RDD(object):
def sample(self, withReplacement, fraction, seed=None):
"""
- Return a sampled subset of this RDD (relies on numpy and falls back
- on default random generator if numpy is unavailable).
+ Return a sampled subset of this RDD.
+
+ >>> rdd = sc.parallelize(range(100), 4)
+ >>> rdd.sample(False, 0.1, 81).count()
+ 10
"""
assert fraction >= 0.0, "Negative fraction value: %s" % fraction
return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)
@@ -343,8 +346,7 @@ class RDD(object):
# this is ported from scala/spark/RDD.scala
def takeSample(self, withReplacement, num, seed=None):
"""
- Return a fixed-size sampled subset of this RDD (currently requires
- numpy).
+ Return a fixed-size sampled subset of this RDD.
>>> rdd = sc.parallelize(range(0, 10))
>>> len(rdd.takeSample(True, 20, 1))
diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py
index 558dcfd12d..459e142780 100644
--- a/python/pyspark/rddsampler.py
+++ b/python/pyspark/rddsampler.py
@@ -17,81 +17,48 @@
import sys
import random
+import math
class RDDSamplerBase(object):
def __init__(self, withReplacement, seed=None):
- try:
- import numpy
- self._use_numpy = True
- except ImportError:
- print >> sys.stderr, (
- "NumPy does not appear to be installed. "
- "Falling back to default random generator for sampling.")
- self._use_numpy = False
-
- self._seed = seed if seed is not None else random.randint(0, 2 ** 32 - 1)
+ self._seed = seed if seed is not None else random.randint(0, sys.maxint)
self._withReplacement = withReplacement
self._random = None
- self._split = None
- self._rand_initialized = False
def initRandomGenerator(self, split):
- if self._use_numpy:
- import numpy
- self._random = numpy.random.RandomState(self._seed ^ split)
- else:
- self._random = random.Random(self._seed ^ split)
+ self._random = random.Random(self._seed ^ split)
# mixing because the initial seeds are close to each other
for _ in xrange(10):
self._random.randint(0, 1)
- self._split = split
- self._rand_initialized = True
-
- def getUniformSample(self, split):
- if not self._rand_initialized or split != self._split:
- self.initRandomGenerator(split)
-
- if self._use_numpy:
- return self._random.random_sample()
+ def getUniformSample(self):
+ return self._random.random()
+
+ def getPoissonSample(self, mean):
+ # Using Knuth's algorithm described in
+ # http://en.wikipedia.org/wiki/Poisson_distribution
+ if mean < 20.0:
+ # one exp and k+1 random calls
+ l = math.exp(-mean)
+ p = self._random.random()
+ k = 0
+ while p > l:
+ k += 1
+ p *= self._random.random()
else:
- return self._random.uniform(0.0, 1.0)
-
- def getPoissonSample(self, split, mean):
- if not self._rand_initialized or split != self._split:
- self.initRandomGenerator(split)
-
- if self._use_numpy:
- return self._random.poisson(mean)
- else:
- # here we simulate drawing numbers n_i ~ Poisson(lambda = 1/mean) by
- # drawing a sequence of numbers delta_j ~ Exp(mean)
- num_arrivals = 1
- cur_time = 0.0
-
- cur_time += self._random.expovariate(mean)
+ # switch to the log domain, k+1 expovariate (random + log) calls
+ p = self._random.expovariate(mean)
+ k = 0
+ while p < 1.0:
+ k += 1
+ p += self._random.expovariate(mean)
+ return k
- if cur_time > 1.0:
- return 0
-
- while(cur_time <= 1.0):
- cur_time += self._random.expovariate(mean)
- num_arrivals += 1
-
- return (num_arrivals - 1)
-
- def shuffle(self, vals):
- if self._random is None:
- self.initRandomGenerator(0) # this should only ever called on the master so
- # the split does not matter
-
- if self._use_numpy:
- self._random.shuffle(vals)
- else:
- self._random.shuffle(vals, self._random.random)
+ def func(self, split, iterator):
+ raise NotImplementedError
class RDDSampler(RDDSamplerBase):
@@ -101,17 +68,18 @@ class RDDSampler(RDDSamplerBase):
self._fraction = fraction
def func(self, split, iterator):
+ self.initRandomGenerator(split)
if self._withReplacement:
for obj in iterator:
# For large datasets, the expected number of occurrences of each element in
# a sample with replacement is Poisson(frac). We use that to get a count for
# each element.
- count = self.getPoissonSample(split, mean=self._fraction)
+ count = self.getPoissonSample(self._fraction)
for _ in range(0, count):
yield obj
else:
for obj in iterator:
- if self.getUniformSample(split) <= self._fraction:
+ if self.getUniformSample() < self._fraction:
yield obj
@@ -119,13 +87,13 @@ class RDDRangeSampler(RDDSamplerBase):
def __init__(self, lowerBound, upperBound, seed=None):
RDDSamplerBase.__init__(self, False, seed)
- self._use_numpy = False # no performance gain from numpy
self._lowerBound = lowerBound
self._upperBound = upperBound
def func(self, split, iterator):
+ self.initRandomGenerator(split)
for obj in iterator:
- if self._lowerBound <= self.getUniformSample(split) < self._upperBound:
+ if self._lowerBound <= self.getUniformSample() < self._upperBound:
yield obj
@@ -136,15 +104,16 @@ class RDDStratifiedSampler(RDDSamplerBase):
self._fractions = fractions
def func(self, split, iterator):
+ self.initRandomGenerator(split)
if self._withReplacement:
for key, val in iterator:
# For large datasets, the expected number of occurrences of each element in
# a sample with replacement is Poisson(frac). We use that to get a count for
# each element.
- count = self.getPoissonSample(split, mean=self._fractions[key])
+ count = self.getPoissonSample(self._fractions[key])
for _ in range(0, count):
yield key, val
else:
for key, val in iterator:
- if self.getUniformSample(split) <= self._fractions[key]:
+ if self.getUniformSample() < self._fractions[key]:
yield key, val