aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2014-11-20 16:40:25 -0800
committerXiangrui Meng <meng@databricks.com>2014-11-20 16:40:25 -0800
commitd39f2e9c683a4ab78b29eb3c5668325bf8568e8c (patch)
tree7ee1e4035b25c5f9daf69a72641e512ace12afcd /python
parentad5f1f3ca240473261162c06ffc5aa70d15a5991 (diff)
downloadspark-d39f2e9c683a4ab78b29eb3c5668325bf8568e8c.tar.gz
spark-d39f2e9c683a4ab78b29eb3c5668325bf8568e8c.tar.bz2
spark-d39f2e9c683a4ab78b29eb3c5668325bf8568e8c.zip
[SPARK-4477] [PySpark] remove numpy from RDDSampler
In RDDSampler, it try use numpy to gain better performance for possion(), but the number of call of random() is only (1+faction) * N in the pure python implementation of possion(), so there is no much performance gain from numpy. numpy is not a dependent of pyspark, so it maybe introduce some problem, such as there is no numpy installed in slaves, but only installed master, as reported in SPARK-927. It also complicate the code a lot, so we may should remove numpy from RDDSampler. I also did some benchmark to verify that: ``` >>> from pyspark.mllib.random import RandomRDDs >>> rdd = RandomRDDs.uniformRDD(sc, 1 << 20, 1).cache() >>> rdd.count() # cache it >>> rdd.sample(True, 0.9).count() # measure this line ``` the results: |withReplacement | random | numpy.random | ------- | ------------ | ------- |True | 1.5 s| 1.4 s| |False| 0.6 s | 0.8 s| closes #2313 Note: this patch including some commits that not mirrored to github, it will be OK after it catches up. Author: Davies Liu <davies@databricks.com> Author: Xiangrui Meng <meng@databricks.com> Closes #3351 from davies/numpy and squashes the following commits: 5c438d7 [Davies Liu] fix comment c5b9252 [Davies Liu] Merge pull request #1 from mengxr/SPARK-4477 98eb31b [Xiangrui Meng] make poisson sampling slightly faster ee17d78 [Davies Liu] remove = for float 13f7b05 [Davies Liu] Merge branch 'master' of http://git-wip-us.apache.org/repos/asf/spark into numpy f583023 [Davies Liu] fix tests 51649f5 [Davies Liu] remove numpy in RDDSampler 78bf997 [Davies Liu] fix tests, do not use numpy in randomSplit, no performance gain f5fdf63 [Davies Liu] fix bug with int in weights 4dfa2cd [Davies Liu] refactor f866bcf [Davies Liu] remove unneeded change c7a2007 [Davies Liu] switch to python implementation 95a48ac [Davies Liu] Merge branch 'master' of github.com:apache/spark into randomSplit 0d9b256 [Davies Liu] refactor 1715ee3 [Davies Liu] address comments 41fce54 [Davies Liu] randomSplit()
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/rdd.py10
-rw-r--r--python/pyspark/rddsampler.py99
2 files changed, 40 insertions, 69 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 50535d2711..57754776fa 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -310,8 +310,11 @@ class RDD(object):
def sample(self, withReplacement, fraction, seed=None):
"""
- Return a sampled subset of this RDD (relies on numpy and falls back
- on default random generator if numpy is unavailable).
+ Return a sampled subset of this RDD.
+
+ >>> rdd = sc.parallelize(range(100), 4)
+ >>> rdd.sample(False, 0.1, 81).count()
+ 10
"""
assert fraction >= 0.0, "Negative fraction value: %s" % fraction
return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)
@@ -343,8 +346,7 @@ class RDD(object):
# this is ported from scala/spark/RDD.scala
def takeSample(self, withReplacement, num, seed=None):
"""
- Return a fixed-size sampled subset of this RDD (currently requires
- numpy).
+ Return a fixed-size sampled subset of this RDD.
>>> rdd = sc.parallelize(range(0, 10))
>>> len(rdd.takeSample(True, 20, 1))
diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py
index 558dcfd12d..459e142780 100644
--- a/python/pyspark/rddsampler.py
+++ b/python/pyspark/rddsampler.py
@@ -17,81 +17,48 @@
import sys
import random
+import math
class RDDSamplerBase(object):
def __init__(self, withReplacement, seed=None):
- try:
- import numpy
- self._use_numpy = True
- except ImportError:
- print >> sys.stderr, (
- "NumPy does not appear to be installed. "
- "Falling back to default random generator for sampling.")
- self._use_numpy = False
-
- self._seed = seed if seed is not None else random.randint(0, 2 ** 32 - 1)
+ self._seed = seed if seed is not None else random.randint(0, sys.maxint)
self._withReplacement = withReplacement
self._random = None
- self._split = None
- self._rand_initialized = False
def initRandomGenerator(self, split):
- if self._use_numpy:
- import numpy
- self._random = numpy.random.RandomState(self._seed ^ split)
- else:
- self._random = random.Random(self._seed ^ split)
+ self._random = random.Random(self._seed ^ split)
# mixing because the initial seeds are close to each other
for _ in xrange(10):
self._random.randint(0, 1)
- self._split = split
- self._rand_initialized = True
-
- def getUniformSample(self, split):
- if not self._rand_initialized or split != self._split:
- self.initRandomGenerator(split)
-
- if self._use_numpy:
- return self._random.random_sample()
+ def getUniformSample(self):
+ return self._random.random()
+
+ def getPoissonSample(self, mean):
+ # Using Knuth's algorithm described in
+ # http://en.wikipedia.org/wiki/Poisson_distribution
+ if mean < 20.0:
+ # one exp and k+1 random calls
+ l = math.exp(-mean)
+ p = self._random.random()
+ k = 0
+ while p > l:
+ k += 1
+ p *= self._random.random()
else:
- return self._random.uniform(0.0, 1.0)
-
- def getPoissonSample(self, split, mean):
- if not self._rand_initialized or split != self._split:
- self.initRandomGenerator(split)
-
- if self._use_numpy:
- return self._random.poisson(mean)
- else:
- # here we simulate drawing numbers n_i ~ Poisson(lambda = 1/mean) by
- # drawing a sequence of numbers delta_j ~ Exp(mean)
- num_arrivals = 1
- cur_time = 0.0
-
- cur_time += self._random.expovariate(mean)
+ # switch to the log domain, k+1 expovariate (random + log) calls
+ p = self._random.expovariate(mean)
+ k = 0
+ while p < 1.0:
+ k += 1
+ p += self._random.expovariate(mean)
+ return k
- if cur_time > 1.0:
- return 0
-
- while(cur_time <= 1.0):
- cur_time += self._random.expovariate(mean)
- num_arrivals += 1
-
- return (num_arrivals - 1)
-
- def shuffle(self, vals):
- if self._random is None:
- self.initRandomGenerator(0) # this should only ever called on the master so
- # the split does not matter
-
- if self._use_numpy:
- self._random.shuffle(vals)
- else:
- self._random.shuffle(vals, self._random.random)
+ def func(self, split, iterator):
+ raise NotImplementedError
class RDDSampler(RDDSamplerBase):
@@ -101,17 +68,18 @@ class RDDSampler(RDDSamplerBase):
self._fraction = fraction
def func(self, split, iterator):
+ self.initRandomGenerator(split)
if self._withReplacement:
for obj in iterator:
# For large datasets, the expected number of occurrences of each element in
# a sample with replacement is Poisson(frac). We use that to get a count for
# each element.
- count = self.getPoissonSample(split, mean=self._fraction)
+ count = self.getPoissonSample(self._fraction)
for _ in range(0, count):
yield obj
else:
for obj in iterator:
- if self.getUniformSample(split) <= self._fraction:
+ if self.getUniformSample() < self._fraction:
yield obj
@@ -119,13 +87,13 @@ class RDDRangeSampler(RDDSamplerBase):
def __init__(self, lowerBound, upperBound, seed=None):
RDDSamplerBase.__init__(self, False, seed)
- self._use_numpy = False # no performance gain from numpy
self._lowerBound = lowerBound
self._upperBound = upperBound
def func(self, split, iterator):
+ self.initRandomGenerator(split)
for obj in iterator:
- if self._lowerBound <= self.getUniformSample(split) < self._upperBound:
+ if self._lowerBound <= self.getUniformSample() < self._upperBound:
yield obj
@@ -136,15 +104,16 @@ class RDDStratifiedSampler(RDDSamplerBase):
self._fractions = fractions
def func(self, split, iterator):
+ self.initRandomGenerator(split)
if self._withReplacement:
for key, val in iterator:
# For large datasets, the expected number of occurrences of each element in
# a sample with replacement is Poisson(frac). We use that to get a count for
# each element.
- count = self.getPoissonSample(split, mean=self._fractions[key])
+ count = self.getPoissonSample(self._fractions[key])
for _ in range(0, count):
yield key, val
else:
for key, val in iterator:
- if self.getUniformSample(split) <= self._fractions[key]:
+ if self.getUniformSample() < self._fractions[key]:
yield key, val