aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorDoris Xin <doris.s.xin@gmail.com>2014-06-12 19:44:27 -0700
committerXiangrui Meng <meng@databricks.com>2014-06-12 19:44:27 -0700
commit1de1d703bf6b7ca14f7b40bbefe9bf6fd6c8ce47 (patch)
treef99459c7412db3dd9479037c41e5a4055853ae09 /python
parent0154587ab71d1b864f97497dbb38bc52b87675be (diff)
downloadspark-1de1d703bf6b7ca14f7b40bbefe9bf6fd6c8ce47.tar.gz
spark-1de1d703bf6b7ca14f7b40bbefe9bf6fd6c8ce47.tar.bz2
spark-1de1d703bf6b7ca14f7b40bbefe9bf6fd6c8ce47.zip
SPARK-1939 Refactor takeSample method in RDD to use ScaSRS
Modified the takeSample method in RDD to use the ScaSRS sampling technique to improve performance. Added a private method that computes sampling rate > sample_size/total to ensure sufficient sample size with success rate >= 0.9999. Added a unit test for the private method to validate choice of sampling rate. Author: Doris Xin <doris.s.xin@gmail.com> Author: dorx <doris.s.xin@gmail.com> Author: Xiangrui Meng <meng@databricks.com> Closes #916 from dorx/takeSample and squashes the following commits: 5b061ae [Doris Xin] merge master 444e750 [Doris Xin] edge cases 3de882b [dorx] Merge pull request #2 from mengxr/SPARK-1939 82dde31 [Xiangrui Meng] update pyspark's takeSample 48d954d [Doris Xin] remove unused imports from RDDSuite fb1452f [Doris Xin] allowing num to be greater than count in all cases 1481b01 [Doris Xin] washing test tubes and making coffee dc699f3 [Doris Xin] give back imports removed by accident in rdd.py 64e445b [Doris Xin] logwarnning as soon as it enters the while loop 55518ed [Doris Xin] added TODO for logging in rdd.py eff89e2 [Doris Xin] addressed reviewer comments. ecab508 [Doris Xin] "fixed checkstyle violation 0a9b3e3 [Doris Xin] "reviewer comment addressed" f80f270 [Doris Xin] Merge branch 'master' into takeSample ae3ad04 [Doris Xin] fixed edge cases to prevent overflow 065ebcd [Doris Xin] Merge branch 'master' into takeSample 9bdd36e [Doris Xin] Check sample size and move computeFraction e3fd6a6 [Doris Xin] Merge branch 'master' into takeSample 7cab53a [Doris Xin] fixed import bug in rdd.py ffea61a [Doris Xin] SPARK-1939: Refactor takeSample method in RDD 1441977 [Doris Xin] SPARK-1939 Refactor takeSample method in RDD to use ScaSRS
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/rdd.py167
1 files changed, 106 insertions, 61 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 735389c698..ddd22850a8 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -31,6 +31,7 @@ from threading import Thread
import warnings
import heapq
from random import Random
+from math import sqrt, log
from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
BatchedSerializer, CloudPickleSerializer, PairDeserializer, \
@@ -202,9 +203,9 @@ class RDD(object):
def persist(self, storageLevel):
"""
- Set this RDD's storage level to persist its values across operations after the first time
- it is computed. This can only be used to assign a new storage level if the RDD does not
- have a storage level set yet.
+ Set this RDD's storage level to persist its values across operations
+ after the first time it is computed. This can only be used to assign
+ a new storage level if the RDD does not have a storage level set yet.
"""
self.is_cached = True
javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel)
@@ -213,7 +214,8 @@ class RDD(object):
def unpersist(self):
"""
- Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
+ Mark the RDD as non-persistent, and remove all blocks for it from
+ memory and disk.
"""
self.is_cached = False
self._jrdd.unpersist()
@@ -357,48 +359,87 @@ class RDD(object):
# this is ported from scala/spark/RDD.scala
def takeSample(self, withReplacement, num, seed=None):
"""
- Return a fixed-size sampled subset of this RDD (currently requires numpy).
+ Return a fixed-size sampled subset of this RDD (currently requires
+ numpy).
- >>> sc.parallelize(range(0, 10)).takeSample(True, 10, 1) #doctest: +SKIP
- [4, 2, 1, 8, 2, 7, 0, 4, 1, 4]
+ >>> rdd = sc.parallelize(range(0, 10))
+ >>> len(rdd.takeSample(True, 20, 1))
+ 20
+ >>> len(rdd.takeSample(False, 5, 2))
+ 5
+ >>> len(rdd.takeSample(False, 15, 3))
+ 10
"""
+ numStDev = 10.0
+
+ if num < 0:
+ raise ValueError("Sample size cannot be negative.")
+ elif num == 0:
+ return []
- fraction = 0.0
- total = 0
- multiplier = 3.0
initialCount = self.count()
- maxSelected = 0
+ if initialCount == 0:
+ return []
- if (num < 0):
- raise ValueError
+ rand = Random(seed)
- if (initialCount == 0):
- return list()
+ if (not withReplacement) and num >= initialCount:
+ # shuffle current RDD and return
+ samples = self.collect()
+ rand.shuffle(samples)
+ return samples
- if initialCount > sys.maxint - 1:
- maxSelected = sys.maxint - 1
- else:
- maxSelected = initialCount
-
- if num > initialCount and not withReplacement:
- total = maxSelected
- fraction = multiplier * (maxSelected + 1) / initialCount
- else:
- fraction = multiplier * (num + 1) / initialCount
- total = num
+ maxSampleSize = sys.maxint - int(numStDev * sqrt(sys.maxint))
+ if num > maxSampleSize:
+ raise ValueError("Sample size cannot be greater than %d." % maxSampleSize)
+ fraction = RDD._computeFractionForSampleSize(num, initialCount, withReplacement)
samples = self.sample(withReplacement, fraction, seed).collect()
# If the first sample didn't turn out large enough, keep trying to take samples;
# this shouldn't happen often because we use a big multiplier for their initial size.
# See: scala/spark/RDD.scala
- rand = Random(seed)
- while len(samples) < total:
- samples = self.sample(withReplacement, fraction, rand.randint(0, sys.maxint)).collect()
-
- sampler = RDDSampler(withReplacement, fraction, rand.randint(0, sys.maxint))
- sampler.shuffle(samples)
- return samples[0:total]
+ while len(samples) < num:
+ # TODO: add log warning for when more than one iteration was run
+ seed = rand.randint(0, sys.maxint)
+ samples = self.sample(withReplacement, fraction, seed).collect()
+
+ rand.shuffle(samples)
+
+ return samples[0:num]
+
+ @staticmethod
+ def _computeFractionForSampleSize(sampleSizeLowerBound, total, withReplacement):
+ """
+ Returns a sampling rate that guarantees a sample of
+ size >= sampleSizeLowerBound 99.99% of the time.
+
+ How the sampling rate is determined:
+ Let p = num / total, where num is the sample size and total is the
+ total number of data points in the RDD. We're trying to compute
+ q > p such that
+ - when sampling with replacement, we're drawing each data point
+ with prob_i ~ Pois(q), where we want to guarantee
+ Pr[s < num] < 0.0001 for s = sum(prob_i for i from 0 to
+ total), i.e. the failure rate of not having a sufficiently large
+ sample < 0.0001. Setting q = p + 5 * sqrt(p/total) is sufficient
+ to guarantee 0.9999 success rate for num > 12, but we need a
+ slightly larger q (9 empirically determined).
+ - when sampling without replacement, we're drawing each data point
+ with prob_i ~ Binomial(total, fraction) and our choice of q
+ guarantees 1-delta, or 0.9999 success rate, where success rate is
+ defined the same as in sampling with replacement.
+ """
+ fraction = float(sampleSizeLowerBound) / total
+ if withReplacement:
+ numStDev = 5
+ if (sampleSizeLowerBound < 12):
+ numStDev = 9
+ return fraction + numStDev * sqrt(fraction / total)
+ else:
+ delta = 0.00005
+ gamma = - log(delta) / total
+ return min(1, fraction + gamma + sqrt(gamma * gamma + 2 * gamma * fraction))
def union(self, other):
"""
@@ -422,8 +463,8 @@ class RDD(object):
def intersection(self, other):
"""
- Return the intersection of this RDD and another one. The output will not
- contain any duplicate elements, even if the input RDDs did.
+ Return the intersection of this RDD and another one. The output will
+ not contain any duplicate elements, even if the input RDDs did.
Note that this method performs a shuffle internally.
@@ -665,8 +706,8 @@ class RDD(object):
modify C{t2}.
The first function (seqOp) can return a different result type, U, than
- the type of this RDD. Thus, we need one operation for merging a T into an U
- and one operation for merging two U
+ the type of this RDD. Thus, we need one operation for merging a T into
+ an U and one operation for merging two U
>>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
>>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
@@ -759,8 +800,9 @@ class RDD(object):
def sampleStdev(self):
"""
- Compute the sample standard deviation of this RDD's elements (which corrects for bias in
- estimating the standard deviation by dividing by N-1 instead of N).
+ Compute the sample standard deviation of this RDD's elements (which
+ corrects for bias in estimating the standard deviation by dividing by
+ N-1 instead of N).
>>> sc.parallelize([1, 2, 3]).sampleStdev()
1.0
@@ -769,8 +811,8 @@ class RDD(object):
def sampleVariance(self):
"""
- Compute the sample variance of this RDD's elements (which corrects for bias in
- estimating the variance by dividing by N-1 instead of N).
+ Compute the sample variance of this RDD's elements (which corrects
+ for bias in estimating the variance by dividing by N-1 instead of N).
>>> sc.parallelize([1, 2, 3]).sampleVariance()
1.0
@@ -822,8 +864,8 @@ class RDD(object):
def takeOrdered(self, num, key=None):
"""
- Get the N elements from a RDD ordered in ascending order or as specified
- by the optional key function.
+ Get the N elements from a RDD ordered in ascending order or as
+ specified by the optional key function.
>>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)
[1, 2, 3, 4, 5, 6]
@@ -912,8 +954,9 @@ class RDD(object):
def saveAsPickleFile(self, path, batchSize=10):
"""
- Save this RDD as a SequenceFile of serialized objects. The serializer used is
- L{pyspark.serializers.PickleSerializer}, default batch size is 10.
+ Save this RDD as a SequenceFile of serialized objects. The serializer
+ used is L{pyspark.serializers.PickleSerializer}, default batch size
+ is 10.
>>> tmpFile = NamedTemporaryFile(delete=True)
>>> tmpFile.close()
@@ -1195,9 +1238,10 @@ class RDD(object):
def foldByKey(self, zeroValue, func, numPartitions=None):
"""
- Merge the values for each key using an associative function "func" and a neutral "zeroValue"
- which may be added to the result an arbitrary number of times, and must not change
- the result (e.g., 0 for addition, or 1 for multiplication.).
+ Merge the values for each key using an associative function "func"
+ and a neutral "zeroValue" which may be added to the result an
+ arbitrary number of times, and must not change the result
+ (e.g., 0 for addition, or 1 for multiplication.).
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> from operator import add
@@ -1217,8 +1261,8 @@ class RDD(object):
Hash-partitions the resulting RDD with into numPartitions partitions.
Note: If you are grouping in order to perform an aggregation (such as a
- sum or average) over each key, using reduceByKey will provide much better
- performance.
+ sum or average) over each key, using reduceByKey will provide much
+ better performance.
>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect()))
@@ -1278,8 +1322,8 @@ class RDD(object):
def cogroup(self, other, numPartitions=None):
"""
For each key k in C{self} or C{other}, return a resulting RDD that
- contains a tuple with the list of values for that key in C{self} as well
- as C{other}.
+ contains a tuple with the list of values for that key in C{self} as
+ well as C{other}.
>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2)])
@@ -1290,8 +1334,8 @@ class RDD(object):
def subtractByKey(self, other, numPartitions=None):
"""
- Return each (key, value) pair in C{self} that has no pair with matching key
- in C{other}.
+ Return each (key, value) pair in C{self} that has no pair with matching
+ key in C{other}.
>>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)])
>>> y = sc.parallelize([("a", 3), ("c", None)])
@@ -1329,10 +1373,10 @@ class RDD(object):
"""
Return a new RDD that has exactly numPartitions partitions.
- Can increase or decrease the level of parallelism in this RDD. Internally, this uses
- a shuffle to redistribute data.
- If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
- which can avoid performing a shuffle.
+ Can increase or decrease the level of parallelism in this RDD.
+ Internally, this uses a shuffle to redistribute data.
+ If you are decreasing the number of partitions in this RDD, consider
+ using `coalesce`, which can avoid performing a shuffle.
>>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
>>> sorted(rdd.glom().collect())
[[1], [2, 3], [4, 5], [6, 7]]
@@ -1357,9 +1401,10 @@ class RDD(object):
def zip(self, other):
"""
- Zips this RDD with another one, returning key-value pairs with the first element in each RDD
- second element in each RDD, etc. Assumes that the two RDDs have the same number of
- partitions and the same number of elements in each partition (e.g. one was made through
+ Zips this RDD with another one, returning key-value pairs with the
+ first element in each RDD second element in each RDD, etc. Assumes
+ that the two RDDs have the same number of partitions and the same
+ number of elements in each partition (e.g. one was made through
a map on the other).
>>> x = sc.parallelize(range(0,5))