diff options
author | Davies Liu <davies@databricks.com> | 2014-11-20 16:40:25 -0800 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2014-11-20 16:40:25 -0800 |
commit | d39f2e9c683a4ab78b29eb3c5668325bf8568e8c (patch) | |
tree | 7ee1e4035b25c5f9daf69a72641e512ace12afcd /python | |
parent | ad5f1f3ca240473261162c06ffc5aa70d15a5991 (diff) | |
download | spark-d39f2e9c683a4ab78b29eb3c5668325bf8568e8c.tar.gz spark-d39f2e9c683a4ab78b29eb3c5668325bf8568e8c.tar.bz2 spark-d39f2e9c683a4ab78b29eb3c5668325bf8568e8c.zip |
[SPARK-4477] [PySpark] remove numpy from RDDSampler
In RDDSampler, it try use numpy to gain better performance for possion(), but the number of call of random() is only (1+faction) * N in the pure python implementation of possion(), so there is no much performance gain from numpy.
numpy is not a dependent of pyspark, so it maybe introduce some problem, such as there is no numpy installed in slaves, but only installed master, as reported in SPARK-927.
It also complicate the code a lot, so we may should remove numpy from RDDSampler.
I also did some benchmark to verify that:
```
>>> from pyspark.mllib.random import RandomRDDs
>>> rdd = RandomRDDs.uniformRDD(sc, 1 << 20, 1).cache()
>>> rdd.count() # cache it
>>> rdd.sample(True, 0.9).count() # measure this line
```
the results:
|withReplacement | random | numpy.random |
------- | ------------ | -------
|True | 1.5 s| 1.4 s|
|False| 0.6 s | 0.8 s|
closes #2313
Note: this patch including some commits that not mirrored to github, it will be OK after it catches up.
Author: Davies Liu <davies@databricks.com>
Author: Xiangrui Meng <meng@databricks.com>
Closes #3351 from davies/numpy and squashes the following commits:
5c438d7 [Davies Liu] fix comment
c5b9252 [Davies Liu] Merge pull request #1 from mengxr/SPARK-4477
98eb31b [Xiangrui Meng] make poisson sampling slightly faster
ee17d78 [Davies Liu] remove = for float
13f7b05 [Davies Liu] Merge branch 'master' of http://git-wip-us.apache.org/repos/asf/spark into numpy
f583023 [Davies Liu] fix tests
51649f5 [Davies Liu] remove numpy in RDDSampler
78bf997 [Davies Liu] fix tests, do not use numpy in randomSplit, no performance gain
f5fdf63 [Davies Liu] fix bug with int in weights
4dfa2cd [Davies Liu] refactor
f866bcf [Davies Liu] remove unneeded change
c7a2007 [Davies Liu] switch to python implementation
95a48ac [Davies Liu] Merge branch 'master' of github.com:apache/spark into randomSplit
0d9b256 [Davies Liu] refactor
1715ee3 [Davies Liu] address comments
41fce54 [Davies Liu] randomSplit()
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/rdd.py | 10 | ||||
-rw-r--r-- | python/pyspark/rddsampler.py | 99 |
2 files changed, 40 insertions, 69 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 50535d2711..57754776fa 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -310,8 +310,11 @@ class RDD(object): def sample(self, withReplacement, fraction, seed=None): """ - Return a sampled subset of this RDD (relies on numpy and falls back - on default random generator if numpy is unavailable). + Return a sampled subset of this RDD. + + >>> rdd = sc.parallelize(range(100), 4) + >>> rdd.sample(False, 0.1, 81).count() + 10 """ assert fraction >= 0.0, "Negative fraction value: %s" % fraction return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True) @@ -343,8 +346,7 @@ class RDD(object): # this is ported from scala/spark/RDD.scala def takeSample(self, withReplacement, num, seed=None): """ - Return a fixed-size sampled subset of this RDD (currently requires - numpy). + Return a fixed-size sampled subset of this RDD. >>> rdd = sc.parallelize(range(0, 10)) >>> len(rdd.takeSample(True, 20, 1)) diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py index 558dcfd12d..459e142780 100644 --- a/python/pyspark/rddsampler.py +++ b/python/pyspark/rddsampler.py @@ -17,81 +17,48 @@ import sys import random +import math class RDDSamplerBase(object): def __init__(self, withReplacement, seed=None): - try: - import numpy - self._use_numpy = True - except ImportError: - print >> sys.stderr, ( - "NumPy does not appear to be installed. " - "Falling back to default random generator for sampling.") - self._use_numpy = False - - self._seed = seed if seed is not None else random.randint(0, 2 ** 32 - 1) + self._seed = seed if seed is not None else random.randint(0, sys.maxint) self._withReplacement = withReplacement self._random = None - self._split = None - self._rand_initialized = False def initRandomGenerator(self, split): - if self._use_numpy: - import numpy - self._random = numpy.random.RandomState(self._seed ^ split) - else: - self._random = random.Random(self._seed ^ split) + self._random = random.Random(self._seed ^ split) # mixing because the initial seeds are close to each other for _ in xrange(10): self._random.randint(0, 1) - self._split = split - self._rand_initialized = True - - def getUniformSample(self, split): - if not self._rand_initialized or split != self._split: - self.initRandomGenerator(split) - - if self._use_numpy: - return self._random.random_sample() + def getUniformSample(self): + return self._random.random() + + def getPoissonSample(self, mean): + # Using Knuth's algorithm described in + # http://en.wikipedia.org/wiki/Poisson_distribution + if mean < 20.0: + # one exp and k+1 random calls + l = math.exp(-mean) + p = self._random.random() + k = 0 + while p > l: + k += 1 + p *= self._random.random() else: - return self._random.uniform(0.0, 1.0) - - def getPoissonSample(self, split, mean): - if not self._rand_initialized or split != self._split: - self.initRandomGenerator(split) - - if self._use_numpy: - return self._random.poisson(mean) - else: - # here we simulate drawing numbers n_i ~ Poisson(lambda = 1/mean) by - # drawing a sequence of numbers delta_j ~ Exp(mean) - num_arrivals = 1 - cur_time = 0.0 - - cur_time += self._random.expovariate(mean) + # switch to the log domain, k+1 expovariate (random + log) calls + p = self._random.expovariate(mean) + k = 0 + while p < 1.0: + k += 1 + p += self._random.expovariate(mean) + return k - if cur_time > 1.0: - return 0 - - while(cur_time <= 1.0): - cur_time += self._random.expovariate(mean) - num_arrivals += 1 - - return (num_arrivals - 1) - - def shuffle(self, vals): - if self._random is None: - self.initRandomGenerator(0) # this should only ever called on the master so - # the split does not matter - - if self._use_numpy: - self._random.shuffle(vals) - else: - self._random.shuffle(vals, self._random.random) + def func(self, split, iterator): + raise NotImplementedError class RDDSampler(RDDSamplerBase): @@ -101,17 +68,18 @@ class RDDSampler(RDDSamplerBase): self._fraction = fraction def func(self, split, iterator): + self.initRandomGenerator(split) if self._withReplacement: for obj in iterator: # For large datasets, the expected number of occurrences of each element in # a sample with replacement is Poisson(frac). We use that to get a count for # each element. - count = self.getPoissonSample(split, mean=self._fraction) + count = self.getPoissonSample(self._fraction) for _ in range(0, count): yield obj else: for obj in iterator: - if self.getUniformSample(split) <= self._fraction: + if self.getUniformSample() < self._fraction: yield obj @@ -119,13 +87,13 @@ class RDDRangeSampler(RDDSamplerBase): def __init__(self, lowerBound, upperBound, seed=None): RDDSamplerBase.__init__(self, False, seed) - self._use_numpy = False # no performance gain from numpy self._lowerBound = lowerBound self._upperBound = upperBound def func(self, split, iterator): + self.initRandomGenerator(split) for obj in iterator: - if self._lowerBound <= self.getUniformSample(split) < self._upperBound: + if self._lowerBound <= self.getUniformSample() < self._upperBound: yield obj @@ -136,15 +104,16 @@ class RDDStratifiedSampler(RDDSamplerBase): self._fractions = fractions def func(self, split, iterator): + self.initRandomGenerator(split) if self._withReplacement: for key, val in iterator: # For large datasets, the expected number of occurrences of each element in # a sample with replacement is Poisson(frac). We use that to get a count for # each element. - count = self.getPoissonSample(split, mean=self._fractions[key]) + count = self.getPoissonSample(self._fractions[key]) for _ in range(0, count): yield key, val else: for key, val in iterator: - if self.getUniformSample(split) <= self._fractions[key]: + if self.getUniformSample() < self._fractions[key]: yield key, val |