From 1de1d703bf6b7ca14f7b40bbefe9bf6fd6c8ce47 Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Thu, 12 Jun 2014 19:44:27 -0700 Subject: SPARK-1939 Refactor takeSample method in RDD to use ScaSRS Modified the takeSample method in RDD to use the ScaSRS sampling technique to improve performance. Added a private method that computes sampling rate > sample_size/total to ensure sufficient sample size with success rate >= 0.9999. Added a unit test for the private method to validate choice of sampling rate. Author: Doris Xin Author: dorx Author: Xiangrui Meng Closes #916 from dorx/takeSample and squashes the following commits: 5b061ae [Doris Xin] merge master 444e750 [Doris Xin] edge cases 3de882b [dorx] Merge pull request #2 from mengxr/SPARK-1939 82dde31 [Xiangrui Meng] update pyspark's takeSample 48d954d [Doris Xin] remove unused imports from RDDSuite fb1452f [Doris Xin] allowing num to be greater than count in all cases 1481b01 [Doris Xin] washing test tubes and making coffee dc699f3 [Doris Xin] give back imports removed by accident in rdd.py 64e445b [Doris Xin] logwarnning as soon as it enters the while loop 55518ed [Doris Xin] added TODO for logging in rdd.py eff89e2 [Doris Xin] addressed reviewer comments. ecab508 [Doris Xin] "fixed checkstyle violation 0a9b3e3 [Doris Xin] "reviewer comment addressed" f80f270 [Doris Xin] Merge branch 'master' into takeSample ae3ad04 [Doris Xin] fixed edge cases to prevent overflow 065ebcd [Doris Xin] Merge branch 'master' into takeSample 9bdd36e [Doris Xin] Check sample size and move computeFraction e3fd6a6 [Doris Xin] Merge branch 'master' into takeSample 7cab53a [Doris Xin] fixed import bug in rdd.py ffea61a [Doris Xin] SPARK-1939: Refactor takeSample method in RDD 1441977 [Doris Xin] SPARK-1939 Refactor takeSample method in RDD to use ScaSRS --- python/pyspark/rdd.py | 167 ++++++++++++++++++++++++++++++++------------------ 1 file changed, 106 insertions(+), 61 deletions(-) (limited to 'python/pyspark/rdd.py') diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 735389c698..ddd22850a8 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -31,6 +31,7 @@ from threading import Thread import warnings import heapq from random import Random +from math import sqrt, log from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ BatchedSerializer, CloudPickleSerializer, PairDeserializer, \ @@ -202,9 +203,9 @@ class RDD(object): def persist(self, storageLevel): """ - Set this RDD's storage level to persist its values across operations after the first time - it is computed. This can only be used to assign a new storage level if the RDD does not - have a storage level set yet. + Set this RDD's storage level to persist its values across operations + after the first time it is computed. This can only be used to assign + a new storage level if the RDD does not have a storage level set yet. """ self.is_cached = True javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel) @@ -213,7 +214,8 @@ class RDD(object): def unpersist(self): """ - Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. + Mark the RDD as non-persistent, and remove all blocks for it from + memory and disk. """ self.is_cached = False self._jrdd.unpersist() @@ -357,48 +359,87 @@ class RDD(object): # this is ported from scala/spark/RDD.scala def takeSample(self, withReplacement, num, seed=None): """ - Return a fixed-size sampled subset of this RDD (currently requires numpy). + Return a fixed-size sampled subset of this RDD (currently requires + numpy). - >>> sc.parallelize(range(0, 10)).takeSample(True, 10, 1) #doctest: +SKIP - [4, 2, 1, 8, 2, 7, 0, 4, 1, 4] + >>> rdd = sc.parallelize(range(0, 10)) + >>> len(rdd.takeSample(True, 20, 1)) + 20 + >>> len(rdd.takeSample(False, 5, 2)) + 5 + >>> len(rdd.takeSample(False, 15, 3)) + 10 """ + numStDev = 10.0 + + if num < 0: + raise ValueError("Sample size cannot be negative.") + elif num == 0: + return [] - fraction = 0.0 - total = 0 - multiplier = 3.0 initialCount = self.count() - maxSelected = 0 + if initialCount == 0: + return [] - if (num < 0): - raise ValueError + rand = Random(seed) - if (initialCount == 0): - return list() + if (not withReplacement) and num >= initialCount: + # shuffle current RDD and return + samples = self.collect() + rand.shuffle(samples) + return samples - if initialCount > sys.maxint - 1: - maxSelected = sys.maxint - 1 - else: - maxSelected = initialCount - - if num > initialCount and not withReplacement: - total = maxSelected - fraction = multiplier * (maxSelected + 1) / initialCount - else: - fraction = multiplier * (num + 1) / initialCount - total = num + maxSampleSize = sys.maxint - int(numStDev * sqrt(sys.maxint)) + if num > maxSampleSize: + raise ValueError("Sample size cannot be greater than %d." % maxSampleSize) + fraction = RDD._computeFractionForSampleSize(num, initialCount, withReplacement) samples = self.sample(withReplacement, fraction, seed).collect() # If the first sample didn't turn out large enough, keep trying to take samples; # this shouldn't happen often because we use a big multiplier for their initial size. # See: scala/spark/RDD.scala - rand = Random(seed) - while len(samples) < total: - samples = self.sample(withReplacement, fraction, rand.randint(0, sys.maxint)).collect() - - sampler = RDDSampler(withReplacement, fraction, rand.randint(0, sys.maxint)) - sampler.shuffle(samples) - return samples[0:total] + while len(samples) < num: + # TODO: add log warning for when more than one iteration was run + seed = rand.randint(0, sys.maxint) + samples = self.sample(withReplacement, fraction, seed).collect() + + rand.shuffle(samples) + + return samples[0:num] + + @staticmethod + def _computeFractionForSampleSize(sampleSizeLowerBound, total, withReplacement): + """ + Returns a sampling rate that guarantees a sample of + size >= sampleSizeLowerBound 99.99% of the time. + + How the sampling rate is determined: + Let p = num / total, where num is the sample size and total is the + total number of data points in the RDD. We're trying to compute + q > p such that + - when sampling with replacement, we're drawing each data point + with prob_i ~ Pois(q), where we want to guarantee + Pr[s < num] < 0.0001 for s = sum(prob_i for i from 0 to + total), i.e. the failure rate of not having a sufficiently large + sample < 0.0001. Setting q = p + 5 * sqrt(p/total) is sufficient + to guarantee 0.9999 success rate for num > 12, but we need a + slightly larger q (9 empirically determined). + - when sampling without replacement, we're drawing each data point + with prob_i ~ Binomial(total, fraction) and our choice of q + guarantees 1-delta, or 0.9999 success rate, where success rate is + defined the same as in sampling with replacement. + """ + fraction = float(sampleSizeLowerBound) / total + if withReplacement: + numStDev = 5 + if (sampleSizeLowerBound < 12): + numStDev = 9 + return fraction + numStDev * sqrt(fraction / total) + else: + delta = 0.00005 + gamma = - log(delta) / total + return min(1, fraction + gamma + sqrt(gamma * gamma + 2 * gamma * fraction)) def union(self, other): """ @@ -422,8 +463,8 @@ class RDD(object): def intersection(self, other): """ - Return the intersection of this RDD and another one. The output will not - contain any duplicate elements, even if the input RDDs did. + Return the intersection of this RDD and another one. The output will + not contain any duplicate elements, even if the input RDDs did. Note that this method performs a shuffle internally. @@ -665,8 +706,8 @@ class RDD(object): modify C{t2}. The first function (seqOp) can return a different result type, U, than - the type of this RDD. Thus, we need one operation for merging a T into an U - and one operation for merging two U + the type of this RDD. Thus, we need one operation for merging a T into + an U and one operation for merging two U >>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1)) >>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1])) @@ -759,8 +800,9 @@ class RDD(object): def sampleStdev(self): """ - Compute the sample standard deviation of this RDD's elements (which corrects for bias in - estimating the standard deviation by dividing by N-1 instead of N). + Compute the sample standard deviation of this RDD's elements (which + corrects for bias in estimating the standard deviation by dividing by + N-1 instead of N). >>> sc.parallelize([1, 2, 3]).sampleStdev() 1.0 @@ -769,8 +811,8 @@ class RDD(object): def sampleVariance(self): """ - Compute the sample variance of this RDD's elements (which corrects for bias in - estimating the variance by dividing by N-1 instead of N). + Compute the sample variance of this RDD's elements (which corrects + for bias in estimating the variance by dividing by N-1 instead of N). >>> sc.parallelize([1, 2, 3]).sampleVariance() 1.0 @@ -822,8 +864,8 @@ class RDD(object): def takeOrdered(self, num, key=None): """ - Get the N elements from a RDD ordered in ascending order or as specified - by the optional key function. + Get the N elements from a RDD ordered in ascending order or as + specified by the optional key function. >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6) [1, 2, 3, 4, 5, 6] @@ -912,8 +954,9 @@ class RDD(object): def saveAsPickleFile(self, path, batchSize=10): """ - Save this RDD as a SequenceFile of serialized objects. The serializer used is - L{pyspark.serializers.PickleSerializer}, default batch size is 10. + Save this RDD as a SequenceFile of serialized objects. The serializer + used is L{pyspark.serializers.PickleSerializer}, default batch size + is 10. >>> tmpFile = NamedTemporaryFile(delete=True) >>> tmpFile.close() @@ -1195,9 +1238,10 @@ class RDD(object): def foldByKey(self, zeroValue, func, numPartitions=None): """ - Merge the values for each key using an associative function "func" and a neutral "zeroValue" - which may be added to the result an arbitrary number of times, and must not change - the result (e.g., 0 for addition, or 1 for multiplication.). + Merge the values for each key using an associative function "func" + and a neutral "zeroValue" which may be added to the result an + arbitrary number of times, and must not change the result + (e.g., 0 for addition, or 1 for multiplication.). >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> from operator import add @@ -1217,8 +1261,8 @@ class RDD(object): Hash-partitions the resulting RDD with into numPartitions partitions. Note: If you are grouping in order to perform an aggregation (such as a - sum or average) over each key, using reduceByKey will provide much better - performance. + sum or average) over each key, using reduceByKey will provide much + better performance. >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect())) @@ -1278,8 +1322,8 @@ class RDD(object): def cogroup(self, other, numPartitions=None): """ For each key k in C{self} or C{other}, return a resulting RDD that - contains a tuple with the list of values for that key in C{self} as well - as C{other}. + contains a tuple with the list of values for that key in C{self} as + well as C{other}. >>> x = sc.parallelize([("a", 1), ("b", 4)]) >>> y = sc.parallelize([("a", 2)]) @@ -1290,8 +1334,8 @@ class RDD(object): def subtractByKey(self, other, numPartitions=None): """ - Return each (key, value) pair in C{self} that has no pair with matching key - in C{other}. + Return each (key, value) pair in C{self} that has no pair with matching + key in C{other}. >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)]) >>> y = sc.parallelize([("a", 3), ("c", None)]) @@ -1329,10 +1373,10 @@ class RDD(object): """ Return a new RDD that has exactly numPartitions partitions. - Can increase or decrease the level of parallelism in this RDD. Internally, this uses - a shuffle to redistribute data. - If you are decreasing the number of partitions in this RDD, consider using `coalesce`, - which can avoid performing a shuffle. + Can increase or decrease the level of parallelism in this RDD. + Internally, this uses a shuffle to redistribute data. + If you are decreasing the number of partitions in this RDD, consider + using `coalesce`, which can avoid performing a shuffle. >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4) >>> sorted(rdd.glom().collect()) [[1], [2, 3], [4, 5], [6, 7]] @@ -1357,9 +1401,10 @@ class RDD(object): def zip(self, other): """ - Zips this RDD with another one, returning key-value pairs with the first element in each RDD - second element in each RDD, etc. Assumes that the two RDDs have the same number of - partitions and the same number of elements in each partition (e.g. one was made through + Zips this RDD with another one, returning key-value pairs with the + first element in each RDD second element in each RDD, etc. Assumes + that the two RDDs have the same number of partitions and the same + number of elements in each partition (e.g. one was made through a map on the other). >>> x = sc.parallelize(range(0,5)) -- cgit v1.2.3