aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorHossein Falaki <falaki@gmail.com>2013-10-17 22:24:48 -0700
committerHossein Falaki <falaki@gmail.com>2013-10-17 22:24:48 -0700
commit1a701358c0811c7f270132291e0646fd806e4984 (patch)
tree599c035a6e05a864420a3e1013a615619705ae57 /core
parent843727af99786a45cf29352b4e05df92c6b3b6b9 (diff)
downloadspark-1a701358c0811c7f270132291e0646fd806e4984.tar.gz
spark-1a701358c0811c7f270132291e0646fd806e4984.tar.bz2
spark-1a701358c0811c7f270132291e0646fd806e4984.zip
Added a countDistinct method to RDD that takes takes an accuracy parameter and returns the (approximate) number of distinct elements in the RDD.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala26
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala13
2 files changed, 38 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 0355618e43..09932db5ea 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -30,6 +30,7 @@ import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.TextOutputFormat
import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
+import com.clearspring.analytics.stream.cardinality.HyperLogLog
import org.apache.spark.Partitioner._
import org.apache.spark.api.java.JavaRDD
@@ -38,7 +39,7 @@ import org.apache.spark.partial.CountEvaluator
import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{Utils, BoundedPriorityQueue}
+import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableHyperLogLog}
import org.apache.spark.SparkContext._
import org.apache.spark._
@@ -766,6 +767,29 @@ abstract class RDD[T: ClassManifest](
}
/**
+ * Return approximate number of distinct elements in the RDD.
+ *
+ * The accuracy of approximation can be controlled through the relative standard diviation
+ * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+ * more accurate counts but increase the memory footprint and vise versa. The default value of
+ * relativeSD is 0.05.
+ */
+ def countDistinct(relativeSD: Double = 0.05): Long = {
+
+ def hllCountPartition(iter: Iterator[T]): Iterator[SerializableHyperLogLog] = {
+ val hllCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
+ while (iter.hasNext) {
+ val v = iter.next()
+ hllCounter.value.offer(v)
+ }
+ Iterator(hllCounter)
+ }
+ def mergeCounters(c1: SerializableHyperLogLog, c2: SerializableHyperLogLog): SerializableHyperLogLog = c1.merge(c2)
+
+ mapPartitions(hllCountPartition).reduce(mergeCounters).value.cardinality()
+ }
+
+ /**
* Take the first num elements of the RDD. It works by first scanning one partition, and use the
* results from that partition to estimate the number of additional partitions needed to satisfy
* the limit.
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 6d1bc5e296..6baf9c7ece 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -63,6 +63,19 @@ class RDDSuite extends FunSuite with SharedSparkContext {
}
}
+ test("Approximate distinct count") {
+
+ def error(est: Long, size: Long) = math.abs(est - size)/size.toDouble
+
+ val size = 100
+ val uniformDistro = for (i <- 1 to 100000) yield i % size
+ val simpleRdd = sc.makeRDD(uniformDistro)
+ assert( error(simpleRdd.countDistinct(0.2), size) < 0.2)
+ assert( error(simpleRdd.countDistinct(0.05), size) < 0.05)
+ assert( error(simpleRdd.countDistinct(0.01), size) < 0.01)
+ assert( error(simpleRdd.countDistinct(0.001), size) < 0.001)
+ }
+
test("SparkContext.union") {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
assert(sc.union(nums).collect().toList === List(1, 2, 3, 4))