aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorHossein Falaki <falaki@gmail.com>2013-10-17 22:26:00 -0700
committerHossein Falaki <falaki@gmail.com>2013-10-17 22:26:00 -0700
commitec5df800fdb0109314c0d5cd6dcac2ecbb9433d6 (patch)
tree6bb22cc0e486f3a1effdb4bf0e08008071257e4d /core
parent1a701358c0811c7f270132291e0646fd806e4984 (diff)
downloadspark-ec5df800fdb0109314c0d5cd6dcac2ecbb9433d6.tar.gz
spark-ec5df800fdb0109314c0d5cd6dcac2ecbb9433d6.tar.bz2
spark-ec5df800fdb0109314c0d5cd6dcac2ecbb9433d6.zip
Added countDistinctByKey to PairRDDFunctions that counts the approximate number of unique values for each key in the RDD.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala51
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala30
2 files changed, 81 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 93b78e1232..f34593f0b6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -39,12 +39,15 @@ import org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil
import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob}
import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter}
+import com.clearspring.analytics.stream.cardinality.HyperLogLog
+
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.Aggregator
import org.apache.spark.Partitioner
import org.apache.spark.Partitioner.defaultPartitioner
+import org.apache.spark.util.SerializableHyperLogLog
/**
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
@@ -207,6 +210,54 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
}
/**
+ * Return approximate number of distinct values for each key in this RDD.
+ * The accuracy of approximation can be controlled through the relative standard diviation
+ * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+ * more accurate counts but increase the memory footprint and vise versa. Uses the provided
+ * Partitioner to partition the output RDD.
+ */
+ def countDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
+ val createHLL = (v: V) => {
+ val hll = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
+ val bres = hll.value.offer(v)
+ hll
+ }
+ val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => {
+ hll.value.offer(v)
+ hll
+ }
+ val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2)
+
+ combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).map {
+ case (k, v) => (k, v.value.cardinality())
+ }
+ }
+
+ /**
+ * Return approximate number of distinct values for each key in this RDD.
+ * The accuracy of approximation can be controlled through the relative standard diviation
+ * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+ * more accurate counts but increase the memory footprint and vise versa. HashPartitions the
+ * output RDD into numPartitions.
+ *
+ */
+ def countDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = {
+ countDistinctByKey(relativeSD, new HashPartitioner(numPartitions))
+ }
+
+ /**
+ * Return approximate number of distinct values for each key this RDD.
+ * The accuracy of approximation can be controlled through the relative standard diviation
+ * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+ * more accurate counts but increase the memory footprint and vise versa. The default value of
+ * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
+ * level.
+ */
+ def countDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = {
+ countDistinctByKey(relativeSD, defaultPartitioner(self))
+ }
+
+ /**
* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a
* "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index 57d3382ed0..d81bc8cb4c 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -109,6 +109,36 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
assert(deps.size === 2) // ShuffledRDD, ParallelCollection.
}
+ test("countDistinctByKey") {
+ def error(est: Long, size: Long) = math.abs(est - size)/size.toDouble
+
+ /* Since HyperLogLog unique counting is approximate, and the relative standard deviation is
+ only a statistical bound, the tests can fail for large values of relativeSD. We will be using
+ relatively tight error bounds to check correctness of functionality rather than checking
+ whether the approximation conforms with the requested bound.
+ */
+ val relativeSD = 0.001
+
+ val stacked = (1 to 100).flatMap(i => (1 to i).map(j => (i, j)))
+ val rdd1 = sc.parallelize(stacked)
+ val counted1 = rdd1.countDistinctByKey(relativeSD).collect()
+ counted1.foreach{
+ case(k, count) => assert(math.abs(error(count, k)) < relativeSD)
+ }
+
+ import scala.util.Random
+ val rnd = new Random()
+ val randStacked = (1 to 100).flatMap{i =>
+ val num = rnd.nextInt%500
+ (1 to num).map(j => (num, j))
+ }
+ val rdd2 = sc.parallelize(randStacked)
+ val counted2 = rdd2.countDistinctByKey(relativeSD, 4).collect()
+ counted2.foreach{
+ case(k, count) => assert(math.abs(error(count, k)) < relativeSD)
+ }
+ }
+
test("join") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))