diff options
author | Hossein Falaki <falaki@gmail.com> | 2013-10-17 22:26:00 -0700 |
---|---|---|
committer | Hossein Falaki <falaki@gmail.com> | 2013-10-17 22:26:00 -0700 |
commit | ec5df800fdb0109314c0d5cd6dcac2ecbb9433d6 (patch) | |
tree | 6bb22cc0e486f3a1effdb4bf0e08008071257e4d /core/src | |
parent | 1a701358c0811c7f270132291e0646fd806e4984 (diff) | |
download | spark-ec5df800fdb0109314c0d5cd6dcac2ecbb9433d6.tar.gz spark-ec5df800fdb0109314c0d5cd6dcac2ecbb9433d6.tar.bz2 spark-ec5df800fdb0109314c0d5cd6dcac2ecbb9433d6.zip |
Added countDistinctByKey to PairRDDFunctions that counts the approximate number of unique values for each key in the RDD.
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 51 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala | 30 |
2 files changed, 81 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 93b78e1232..f34593f0b6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -39,12 +39,15 @@ import org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob} import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter} +import com.clearspring.analytics.stream.cardinality.HyperLogLog + import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.Aggregator import org.apache.spark.Partitioner import org.apache.spark.Partitioner.defaultPartitioner +import org.apache.spark.util.SerializableHyperLogLog /** * Extra functions available on RDDs of (key, value) pairs through an implicit conversion. @@ -207,6 +210,54 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) } /** + * Return approximate number of distinct values for each key in this RDD. + * The accuracy of approximation can be controlled through the relative standard diviation + * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in + * more accurate counts but increase the memory footprint and vise versa. Uses the provided + * Partitioner to partition the output RDD. + */ + def countDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { + val createHLL = (v: V) => { + val hll = new SerializableHyperLogLog(new HyperLogLog(relativeSD)) + val bres = hll.value.offer(v) + hll + } + val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => { + hll.value.offer(v) + hll + } + val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2) + + combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).map { + case (k, v) => (k, v.value.cardinality()) + } + } + + /** + * Return approximate number of distinct values for each key in this RDD. + * The accuracy of approximation can be controlled through the relative standard diviation + * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in + * more accurate counts but increase the memory footprint and vise versa. HashPartitions the + * output RDD into numPartitions. + * + */ + def countDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = { + countDistinctByKey(relativeSD, new HashPartitioner(numPartitions)) + } + + /** + * Return approximate number of distinct values for each key this RDD. + * The accuracy of approximation can be controlled through the relative standard diviation + * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in + * more accurate counts but increase the memory footprint and vise versa. The default value of + * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism + * level. + */ + def countDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = { + countDistinctByKey(relativeSD, defaultPartitioner(self)) + } + + /** * Merge the values for each key using an associative reduce function. This will also perform * the merging locally on each mapper before sending results to a reducer, similarly to a * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions. diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 57d3382ed0..d81bc8cb4c 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -109,6 +109,36 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { assert(deps.size === 2) // ShuffledRDD, ParallelCollection. } + test("countDistinctByKey") { + def error(est: Long, size: Long) = math.abs(est - size)/size.toDouble + + /* Since HyperLogLog unique counting is approximate, and the relative standard deviation is + only a statistical bound, the tests can fail for large values of relativeSD. We will be using + relatively tight error bounds to check correctness of functionality rather than checking + whether the approximation conforms with the requested bound. + */ + val relativeSD = 0.001 + + val stacked = (1 to 100).flatMap(i => (1 to i).map(j => (i, j))) + val rdd1 = sc.parallelize(stacked) + val counted1 = rdd1.countDistinctByKey(relativeSD).collect() + counted1.foreach{ + case(k, count) => assert(math.abs(error(count, k)) < relativeSD) + } + + import scala.util.Random + val rnd = new Random() + val randStacked = (1 to 100).flatMap{i => + val num = rnd.nextInt%500 + (1 to num).map(j => (num, j)) + } + val rdd2 = sc.parallelize(randStacked) + val counted2 = rdd2.countDistinctByKey(relativeSD, 4).collect() + counted2.foreach{ + case(k, count) => assert(math.abs(error(count, k)) < relativeSD) + } + } + test("join") { val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) |