aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala51
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala90
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala53
-rw-r--r--core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala52
5 files changed, 149 insertions, 109 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 4c8f9ed6fb..7dcfbf741c 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -672,38 +672,47 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
/**
* Return approximate number of distinct values for each key in this RDD.
- * The accuracy of approximation can be controlled through the relative standard deviation
- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
- * more accurate counts but increase the memory footprint and vise versa. Uses the provided
- * Partitioner to partition the output RDD.
+ *
+ * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
+ * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
+ * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+ *
+ * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
+ * It must be greater than 0.000017.
+ * @param partitioner partitioner of the resulting RDD.
*/
- def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaRDD[(K, Long)] = {
- rdd.countApproxDistinctByKey(relativeSD, partitioner)
+ def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaPairRDD[K, Long] =
+ {
+ fromRDD(rdd.countApproxDistinctByKey(relativeSD, partitioner))
}
/**
- * Return approximate number of distinct values for each key this RDD.
- * The accuracy of approximation can be controlled through the relative standard deviation
- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
- * more accurate counts but increase the memory footprint and vise versa. The default value of
- * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
- * level.
+ * Return approximate number of distinct values for each key in this RDD.
+ *
+ * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
+ * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
+ * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+ *
+ * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
+ * It must be greater than 0.000017.
+ * @param numPartitions number of partitions of the resulting RDD.
*/
- def countApproxDistinctByKey(relativeSD: Double = 0.05): JavaRDD[(K, Long)] = {
- rdd.countApproxDistinctByKey(relativeSD)
+ def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaPairRDD[K, Long] = {
+ fromRDD(rdd.countApproxDistinctByKey(relativeSD, numPartitions))
}
-
/**
* Return approximate number of distinct values for each key in this RDD.
- * The accuracy of approximation can be controlled through the relative standard deviation
- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
- * more accurate counts but increase the memory footprint and vise versa. HashPartitions the
- * output RDD into numPartitions.
*
+ * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
+ * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
+ * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+ *
+ * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
+ * It must be greater than 0.000017.
*/
- def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = {
- rdd.countApproxDistinctByKey(relativeSD, numPartitions)
+ def countApproxDistinctByKey(relativeSD: Double): JavaPairRDD[K, Long] = {
+ fromRDD(rdd.countApproxDistinctByKey(relativeSD))
}
/** Assign a name to this RDD */
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 619bfd75be..330569a8d8 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -560,12 +560,14 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return approximate number of distinct elements in the RDD.
*
- * The accuracy of approximation can be controlled through the relative standard deviation
- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
- * more accurate counts but increase the memory footprint and vise versa. The default value of
- * relativeSD is 0.05.
+ * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
+ * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
+ * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+ *
+ * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
+ * It must be greater than 0.000017.
*/
- def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD)
+ def countApproxDistinct(relativeSD: Double): Long = rdd.countApproxDistinct(relativeSD)
def name(): String = rdd.name
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 223fef7926..f2ce3cbd47 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -28,7 +28,7 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
-import com.clearspring.analytics.stream.cardinality.HyperLogLog
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.io.SequenceFile.CompressionType
@@ -46,7 +46,6 @@ import org.apache.spark.Partitioner.defaultPartitioner
import org.apache.spark.SparkContext._
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.serializer.Serializer
-import org.apache.spark.util.SerializableHyperLogLog
/**
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
@@ -214,39 +213,88 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}
/**
+ * :: Experimental ::
+ *
* Return approximate number of distinct values for each key in this RDD.
- * The accuracy of approximation can be controlled through the relative standard deviation
- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
- * more accurate counts but increase the memory footprint and vice versa. Uses the provided
- * Partitioner to partition the output RDD.
+ *
+ * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
+ * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
+ * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+ *
+ * The relative accuracy is approximately `1.054 / sqrt(2^p)`. Setting a nonzero `sp > p`
+ * would trigger sparse representation of registers, which may reduce the memory consumption
+ * and increase accuracy when the cardinality is small.
+ *
+ * @param p The precision value for the normal set.
+ * `p` must be a value between 4 and `sp` if `sp` is not zero (32 max).
+ * @param sp The precision value for the sparse set, between 0 and 32.
+ * If `sp` equals 0, the sparse representation is skipped.
+ * @param partitioner Partitioner to use for the resulting RDD.
*/
- def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
- val createHLL = (v: V) => new SerializableHyperLogLog(new HyperLogLog(relativeSD)).add(v)
- val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => hll.add(v)
- val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2)
+ @Experimental
+ def countApproxDistinctByKey(p: Int, sp: Int, partitioner: Partitioner): RDD[(K, Long)] = {
+ require(p >= 4, s"p ($p) must be >= 4")
+ require(sp <= 32, s"sp ($sp) must be <= 32")
+ require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)")
+ val createHLL = (v: V) => {
+ val hll = new HyperLogLogPlus(p, sp)
+ hll.offer(v)
+ hll
+ }
+ val mergeValueHLL = (hll: HyperLogLogPlus, v: V) => {
+ hll.offer(v)
+ hll
+ }
+ val mergeHLL = (h1: HyperLogLogPlus, h2: HyperLogLogPlus) => {
+ h1.addAll(h2)
+ h1
+ }
+
+ combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.cardinality())
+ }
- combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.value.cardinality())
+ /**
+ * Return approximate number of distinct values for each key in this RDD.
+ *
+ * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
+ * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
+ * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+ *
+ * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
+ * It must be greater than 0.000017.
+ * @param partitioner partitioner of the resulting RDD
+ */
+ def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
+ require(relativeSD > 0.000017, s"accuracy ($relativeSD) must be greater than 0.000017")
+ val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt
+ assert(p <= 32)
+ countApproxDistinctByKey(if (p < 4) 4 else p, 0, partitioner)
}
/**
* Return approximate number of distinct values for each key in this RDD.
- * The accuracy of approximation can be controlled through the relative standard deviation
- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
- * more accurate counts but increase the memory footprint and vice versa. HashPartitions the
- * output RDD into numPartitions.
*
+ * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
+ * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
+ * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+ *
+ * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
+ * It must be greater than 0.000017.
+ * @param numPartitions number of partitions of the resulting RDD
*/
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = {
countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions))
}
/**
- * Return approximate number of distinct values for each key this RDD.
- * The accuracy of approximation can be controlled through the relative standard deviation
- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
- * more accurate counts but increase the memory footprint and vice versa. The default value of
- * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
- * level.
+ * Return approximate number of distinct values for each key in this RDD.
+ *
+ * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
+ * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
+ * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+ *
+ * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
+ * It must be greater than 0.000017.
*/
def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = {
countApproxDistinctByKey(relativeSD, defaultPartitioner(self))
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index aa03e9276f..585b2f76af 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -19,12 +19,11 @@ package org.apache.spark.rdd
import java.util.Random
-import scala.collection.Map
-import scala.collection.mutable
+import scala.collection.{mutable, Map}
import scala.collection.mutable.ArrayBuffer
import scala.reflect.{classTag, ClassTag}
-import com.clearspring.analytics.stream.cardinality.HyperLogLog
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.io.NullWritable
@@ -41,7 +40,7 @@ import org.apache.spark.partial.CountEvaluator
import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{BoundedPriorityQueue, SerializableHyperLogLog, Utils}
+import org.apache.spark.util.{BoundedPriorityQueue, Utils}
import org.apache.spark.util.collection.OpenHashMap
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler}
@@ -921,15 +920,49 @@ abstract class RDD[T: ClassTag](
* :: Experimental ::
* Return approximate number of distinct elements in the RDD.
*
- * The accuracy of approximation can be controlled through the relative standard deviation
- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
- * more accurate counts but increase the memory footprint and vise versa. The default value of
- * relativeSD is 0.05.
+ * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
+ * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
+ * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+ *
+ * The relative accuracy is approximately `1.054 / sqrt(2^p)`. Setting a nonzero `sp > p`
+ * would trigger sparse representation of registers, which may reduce the memory consumption
+ * and increase accuracy when the cardinality is small.
+ *
+ * @param p The precision value for the normal set.
+ * `p` must be a value between 4 and `sp` if `sp` is not zero (32 max).
+ * @param sp The precision value for the sparse set, between 0 and 32.
+ * If `sp` equals 0, the sparse representation is skipped.
*/
@Experimental
+ def countApproxDistinct(p: Int, sp: Int): Long = {
+ require(p >= 4, s"p ($p) must be greater than 0")
+ require(sp <= 32, s"sp ($sp) cannot be greater than 32")
+ require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)")
+ val zeroCounter = new HyperLogLogPlus(p, sp)
+ aggregate(zeroCounter)(
+ (hll: HyperLogLogPlus, v: T) => {
+ hll.offer(v)
+ hll
+ },
+ (h1: HyperLogLogPlus, h2: HyperLogLogPlus) => {
+ h1.addAll(h2)
+ h2
+ }).cardinality()
+ }
+
+ /**
+ * Return approximate number of distinct elements in the RDD.
+ *
+ * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
+ * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
+ * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+ *
+ * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
+ * It must be greater than 0.000017.
+ */
def countApproxDistinct(relativeSD: Double = 0.05): Long = {
- val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
- aggregate(zeroCounter)(_.add(_), _.merge(_)).value.cardinality()
+ val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt
+ countApproxDistinct(p, 0)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
deleted file mode 100644
index 21a88eea3b..0000000000
--- a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import java.io.{Externalizable, ObjectInput, ObjectOutput}
-
-import com.clearspring.analytics.stream.cardinality.{HyperLogLog, ICardinality}
-
-/**
- * A wrapper around [[com.clearspring.analytics.stream.cardinality.HyperLogLog]] that is
- * serializable.
- */
-private[spark]
-class SerializableHyperLogLog(var value: ICardinality) extends Externalizable {
-
- def this() = this(null) // For deserialization
-
- def merge(other: SerializableHyperLogLog) = new SerializableHyperLogLog(value.merge(other.value))
-
- def add[T](elem: T) = {
- this.value.offer(elem)
- this
- }
-
- def readExternal(in: ObjectInput) {
- val byteLength = in.readInt()
- val bytes = new Array[Byte](byteLength)
- in.readFully(bytes)
- value = HyperLogLog.Builder.build(bytes)
- }
-
- def writeExternal(out: ObjectOutput) {
- val bytes = value.getBytes()
- out.writeInt(bytes.length)
- out.write(bytes)
- }
-}