aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-06-03 18:37:40 -0700
committerXiangrui Meng <meng@databricks.com>2014-06-03 18:37:40 -0700
commit1faef149f763f4a54aaa6e17043d0a628ae338a0 (patch)
tree620cce43de43aca6df401bd910e259f335ae22eb
parent21e40ed88bf2c205c3d7f947fde5d5a6f3e29f7f (diff)
downloadspark-1faef149f763f4a54aaa6e17043d0a628ae338a0.tar.gz
spark-1faef149f763f4a54aaa6e17043d0a628ae338a0.tar.bz2
spark-1faef149f763f4a54aaa6e17043d0a628ae338a0.zip
SPARK-1941: Update streamlib to 2.7.0 and use HyperLogLogPlus instead of HyperLogLog.
I also corrected some errors made in the previous HLL count approximate API, including relativeSD wasn't really a measure for error (and we used it to test error bounds in test results). Author: Reynold Xin <rxin@apache.org> Closes #897 from rxin/hll and squashes the following commits: 4d83f41 [Reynold Xin] New error bound and non-randomness. f154ea0 [Reynold Xin] Added a comment on the value bound for testing. e367527 [Reynold Xin] One more round of code review. 41e649a [Reynold Xin] Update final mima list. 9e320c8 [Reynold Xin] Incorporate code review feedback. e110d70 [Reynold Xin] Merge branch 'master' into hll 354deb8 [Reynold Xin] Added comment on the Mima exclude rules. acaa524 [Reynold Xin] Added the right exclude rules in MimaExcludes. 6555bfe [Reynold Xin] Added a default method and re-arranged MimaExcludes. 1db1522 [Reynold Xin] Excluded util.SerializableHyperLogLog from MIMA check. 9221b27 [Reynold Xin] Merge branch 'master' into hll 88cfe77 [Reynold Xin] Updated documentation and restored the old incorrect API to maintain API compatibility. 1294be6 [Reynold Xin] Updated HLL+. e7786cb [Reynold Xin] Merge branch 'master' into hll c0ef0c2 [Reynold Xin] SPARK-1941: Update streamlib to 2.7.0 and use HyperLogLogPlus instead of HyperLogLog.
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala51
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala90
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala53
-rw-r--r--core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala52
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java10
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala22
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala6
-rw-r--r--pom.xml4
-rw-r--r--project/MimaExcludes.scala22
-rw-r--r--project/SparkBuild.scala2
11 files changed, 189 insertions, 135 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 4c8f9ed6fb..7dcfbf741c 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -672,38 +672,47 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
/**
* Return approximate number of distinct values for each key in this RDD.
- * The accuracy of approximation can be controlled through the relative standard deviation
- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
- * more accurate counts but increase the memory footprint and vise versa. Uses the provided
- * Partitioner to partition the output RDD.
+ *
+ * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
+ * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
+ * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+ *
+ * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
+ * It must be greater than 0.000017.
+ * @param partitioner partitioner of the resulting RDD.
*/
- def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaRDD[(K, Long)] = {
- rdd.countApproxDistinctByKey(relativeSD, partitioner)
+ def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaPairRDD[K, Long] =
+ {
+ fromRDD(rdd.countApproxDistinctByKey(relativeSD, partitioner))
}
/**
- * Return approximate number of distinct values for each key this RDD.
- * The accuracy of approximation can be controlled through the relative standard deviation
- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
- * more accurate counts but increase the memory footprint and vise versa. The default value of
- * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
- * level.
+ * Return approximate number of distinct values for each key in this RDD.
+ *
+ * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
+ * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
+ * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+ *
+ * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
+ * It must be greater than 0.000017.
+ * @param numPartitions number of partitions of the resulting RDD.
*/
- def countApproxDistinctByKey(relativeSD: Double = 0.05): JavaRDD[(K, Long)] = {
- rdd.countApproxDistinctByKey(relativeSD)
+ def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaPairRDD[K, Long] = {
+ fromRDD(rdd.countApproxDistinctByKey(relativeSD, numPartitions))
}
-
/**
* Return approximate number of distinct values for each key in this RDD.
- * The accuracy of approximation can be controlled through the relative standard deviation
- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
- * more accurate counts but increase the memory footprint and vise versa. HashPartitions the
- * output RDD into numPartitions.
*
+ * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
+ * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
+ * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+ *
+ * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
+ * It must be greater than 0.000017.
*/
- def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = {
- rdd.countApproxDistinctByKey(relativeSD, numPartitions)
+ def countApproxDistinctByKey(relativeSD: Double): JavaPairRDD[K, Long] = {
+ fromRDD(rdd.countApproxDistinctByKey(relativeSD))
}
/** Assign a name to this RDD */
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 619bfd75be..330569a8d8 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -560,12 +560,14 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return approximate number of distinct elements in the RDD.
*
- * The accuracy of approximation can be controlled through the relative standard deviation
- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
- * more accurate counts but increase the memory footprint and vise versa. The default value of
- * relativeSD is 0.05.
+ * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
+ * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
+ * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+ *
+ * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
+ * It must be greater than 0.000017.
*/
- def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD)
+ def countApproxDistinct(relativeSD: Double): Long = rdd.countApproxDistinct(relativeSD)
def name(): String = rdd.name
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 223fef7926..f2ce3cbd47 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -28,7 +28,7 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
-import com.clearspring.analytics.stream.cardinality.HyperLogLog
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.io.SequenceFile.CompressionType
@@ -46,7 +46,6 @@ import org.apache.spark.Partitioner.defaultPartitioner
import org.apache.spark.SparkContext._
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.serializer.Serializer
-import org.apache.spark.util.SerializableHyperLogLog
/**
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
@@ -214,39 +213,88 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}
/**
+ * :: Experimental ::
+ *
* Return approximate number of distinct values for each key in this RDD.
- * The accuracy of approximation can be controlled through the relative standard deviation
- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
- * more accurate counts but increase the memory footprint and vice versa. Uses the provided
- * Partitioner to partition the output RDD.
+ *
+ * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
+ * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
+ * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+ *
+ * The relative accuracy is approximately `1.054 / sqrt(2^p)`. Setting a nonzero `sp > p`
+ * would trigger sparse representation of registers, which may reduce the memory consumption
+ * and increase accuracy when the cardinality is small.
+ *
+ * @param p The precision value for the normal set.
+ * `p` must be a value between 4 and `sp` if `sp` is not zero (32 max).
+ * @param sp The precision value for the sparse set, between 0 and 32.
+ * If `sp` equals 0, the sparse representation is skipped.
+ * @param partitioner Partitioner to use for the resulting RDD.
*/
- def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
- val createHLL = (v: V) => new SerializableHyperLogLog(new HyperLogLog(relativeSD)).add(v)
- val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => hll.add(v)
- val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2)
+ @Experimental
+ def countApproxDistinctByKey(p: Int, sp: Int, partitioner: Partitioner): RDD[(K, Long)] = {
+ require(p >= 4, s"p ($p) must be >= 4")
+ require(sp <= 32, s"sp ($sp) must be <= 32")
+ require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)")
+ val createHLL = (v: V) => {
+ val hll = new HyperLogLogPlus(p, sp)
+ hll.offer(v)
+ hll
+ }
+ val mergeValueHLL = (hll: HyperLogLogPlus, v: V) => {
+ hll.offer(v)
+ hll
+ }
+ val mergeHLL = (h1: HyperLogLogPlus, h2: HyperLogLogPlus) => {
+ h1.addAll(h2)
+ h1
+ }
+
+ combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.cardinality())
+ }
- combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.value.cardinality())
+ /**
+ * Return approximate number of distinct values for each key in this RDD.
+ *
+ * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
+ * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
+ * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+ *
+ * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
+ * It must be greater than 0.000017.
+ * @param partitioner partitioner of the resulting RDD
+ */
+ def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
+ require(relativeSD > 0.000017, s"accuracy ($relativeSD) must be greater than 0.000017")
+ val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt
+ assert(p <= 32)
+ countApproxDistinctByKey(if (p < 4) 4 else p, 0, partitioner)
}
/**
* Return approximate number of distinct values for each key in this RDD.
- * The accuracy of approximation can be controlled through the relative standard deviation
- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
- * more accurate counts but increase the memory footprint and vice versa. HashPartitions the
- * output RDD into numPartitions.
*
+ * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
+ * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
+ * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+ *
+ * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
+ * It must be greater than 0.000017.
+ * @param numPartitions number of partitions of the resulting RDD
*/
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = {
countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions))
}
/**
- * Return approximate number of distinct values for each key this RDD.
- * The accuracy of approximation can be controlled through the relative standard deviation
- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
- * more accurate counts but increase the memory footprint and vice versa. The default value of
- * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
- * level.
+ * Return approximate number of distinct values for each key in this RDD.
+ *
+ * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
+ * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
+ * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+ *
+ * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
+ * It must be greater than 0.000017.
*/
def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = {
countApproxDistinctByKey(relativeSD, defaultPartitioner(self))
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index aa03e9276f..585b2f76af 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -19,12 +19,11 @@ package org.apache.spark.rdd
import java.util.Random
-import scala.collection.Map
-import scala.collection.mutable
+import scala.collection.{mutable, Map}
import scala.collection.mutable.ArrayBuffer
import scala.reflect.{classTag, ClassTag}
-import com.clearspring.analytics.stream.cardinality.HyperLogLog
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.io.NullWritable
@@ -41,7 +40,7 @@ import org.apache.spark.partial.CountEvaluator
import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{BoundedPriorityQueue, SerializableHyperLogLog, Utils}
+import org.apache.spark.util.{BoundedPriorityQueue, Utils}
import org.apache.spark.util.collection.OpenHashMap
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler}
@@ -921,15 +920,49 @@ abstract class RDD[T: ClassTag](
* :: Experimental ::
* Return approximate number of distinct elements in the RDD.
*
- * The accuracy of approximation can be controlled through the relative standard deviation
- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
- * more accurate counts but increase the memory footprint and vise versa. The default value of
- * relativeSD is 0.05.
+ * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
+ * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
+ * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+ *
+ * The relative accuracy is approximately `1.054 / sqrt(2^p)`. Setting a nonzero `sp > p`
+ * would trigger sparse representation of registers, which may reduce the memory consumption
+ * and increase accuracy when the cardinality is small.
+ *
+ * @param p The precision value for the normal set.
+ * `p` must be a value between 4 and `sp` if `sp` is not zero (32 max).
+ * @param sp The precision value for the sparse set, between 0 and 32.
+ * If `sp` equals 0, the sparse representation is skipped.
*/
@Experimental
+ def countApproxDistinct(p: Int, sp: Int): Long = {
+ require(p >= 4, s"p ($p) must be greater than 0")
+ require(sp <= 32, s"sp ($sp) cannot be greater than 32")
+ require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)")
+ val zeroCounter = new HyperLogLogPlus(p, sp)
+ aggregate(zeroCounter)(
+ (hll: HyperLogLogPlus, v: T) => {
+ hll.offer(v)
+ hll
+ },
+ (h1: HyperLogLogPlus, h2: HyperLogLogPlus) => {
+ h1.addAll(h2)
+ h2
+ }).cardinality()
+ }
+
+ /**
+ * Return approximate number of distinct elements in the RDD.
+ *
+ * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
+ * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
+ * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+ *
+ * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
+ * It must be greater than 0.000017.
+ */
def countApproxDistinct(relativeSD: Double = 0.05): Long = {
- val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
- aggregate(zeroCounter)(_.add(_), _.merge(_)).value.cardinality()
+ val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt
+ countApproxDistinct(p, 0)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
deleted file mode 100644
index 21a88eea3b..0000000000
--- a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import java.io.{Externalizable, ObjectInput, ObjectOutput}
-
-import com.clearspring.analytics.stream.cardinality.{HyperLogLog, ICardinality}
-
-/**
- * A wrapper around [[com.clearspring.analytics.stream.cardinality.HyperLogLog]] that is
- * serializable.
- */
-private[spark]
-class SerializableHyperLogLog(var value: ICardinality) extends Externalizable {
-
- def this() = this(null) // For deserialization
-
- def merge(other: SerializableHyperLogLog) = new SerializableHyperLogLog(value.merge(other.value))
-
- def add[T](elem: T) = {
- this.value.offer(elem)
- this
- }
-
- def readExternal(in: ObjectInput) {
- val byteLength = in.readInt()
- val bytes = new Array[Byte](byteLength)
- in.readFully(bytes)
- value = HyperLogLog.Builder.build(bytes)
- }
-
- def writeExternal(out: ObjectOutput) {
- val bytes = value.getBytes()
- out.writeInt(bytes.length)
- out.write(bytes)
- }
-}
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 7193223add..b78309f81c 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -1028,27 +1028,23 @@ public class JavaAPISuite implements Serializable {
arrayData.add(i % size);
}
JavaRDD<Integer> simpleRdd = sc.parallelize(arrayData, 10);
- Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.2) - size) / (size * 1.0)) < 0.2);
- Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.05) - size) / (size * 1.0)) <= 0.05);
- Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.01) - size) / (size * 1.0)) <= 0.01);
+ Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.05) - size) / (size * 1.0)) <= 0.1);
}
@Test
public void countApproxDistinctByKey() {
- double relativeSD = 0.001;
-
List<Tuple2<Integer, Integer>> arrayData = new ArrayList<Tuple2<Integer, Integer>>();
for (int i = 10; i < 100; i++)
for (int j = 0; j < i; j++)
arrayData.add(new Tuple2<Integer, Integer>(i, j));
JavaPairRDD<Integer, Integer> pairRdd = sc.parallelizePairs(arrayData);
- List<Tuple2<Integer, Object>> res = pairRdd.countApproxDistinctByKey(relativeSD).collect();
+ List<Tuple2<Integer, Object>> res = pairRdd.countApproxDistinctByKey(8, 0).collect();
for (Tuple2<Integer, Object> resItem : res) {
double count = (double)resItem._1();
Long resCount = (Long)resItem._2();
Double error = Math.abs((resCount - count) / count);
- Assert.assertTrue(error < relativeSD);
+ Assert.assertTrue(error < 0.1);
}
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index 1230565ea5..9ddafc4518 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -119,28 +119,30 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
* relatively tight error bounds to check correctness of functionality rather than checking
* whether the approximation conforms with the requested bound.
*/
- val relativeSD = 0.001
+ val p = 20
+ val sp = 0
+ // When p = 20, the relative accuracy is about 0.001. So with high probability, the
+ // relative error should be smaller than the threshold 0.01 we use here.
+ val relativeSD = 0.01
// For each value i, there are i tuples with first element equal to i.
// Therefore, the expected count for key i would be i.
val stacked = (1 to 100).flatMap(i => (1 to i).map(j => (i, j)))
val rdd1 = sc.parallelize(stacked)
- val counted1 = rdd1.countApproxDistinctByKey(relativeSD).collect()
- counted1.foreach{
- case(k, count) => assert(error(count, k) < relativeSD)
- }
+ val counted1 = rdd1.countApproxDistinctByKey(p, sp).collect()
+ counted1.foreach { case (k, count) => assert(error(count, k) < relativeSD) }
- val rnd = new Random()
+ val rnd = new Random(42)
// The expected count for key num would be num
val randStacked = (1 to 100).flatMap { i =>
- val num = rnd.nextInt % 500
+ val num = rnd.nextInt() % 500
(1 to num).map(j => (num, j))
}
val rdd2 = sc.parallelize(randStacked)
- val counted2 = rdd2.countApproxDistinctByKey(relativeSD, 4).collect()
- counted2.foreach{
- case(k, count) => assert(error(count, k) < relativeSD)
+ val counted2 = rdd2.countApproxDistinctByKey(relativeSD).collect()
+ counted2.foreach { case (k, count) =>
+ assert(error(count, k) < relativeSD, s"${error(count, k)} < $relativeSD")
}
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index e686068f7a..bbd0c14178 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -73,10 +73,8 @@ class RDDSuite extends FunSuite with SharedSparkContext {
val size = 100
val uniformDistro = for (i <- 1 to 100000) yield i % size
val simpleRdd = sc.makeRDD(uniformDistro)
- assert(error(simpleRdd.countApproxDistinct(0.2), size) < 0.2)
- assert(error(simpleRdd.countApproxDistinct(0.05), size) < 0.05)
- assert(error(simpleRdd.countApproxDistinct(0.01), size) < 0.01)
- assert(error(simpleRdd.countApproxDistinct(0.001), size) < 0.001)
+ assert(error(simpleRdd.countApproxDistinct(4, 0), size) < 0.4)
+ assert(error(simpleRdd.countApproxDistinct(8, 0), size) < 0.1)
}
test("SparkContext.union") {
diff --git a/pom.xml b/pom.xml
index 0a5ca9e72a..fcd6f66b44 100644
--- a/pom.xml
+++ b/pom.xml
@@ -300,9 +300,9 @@
<dependency>
<groupId>com.clearspring.analytics</groupId>
<artifactId>stream</artifactId>
- <version>2.5.1</version>
+ <version>2.7.0</version>
<exclusions>
- <!-- Only HyperLogLog is used, which doesn't depend on fastutil -->
+ <!-- Only HyperLogLogPlus is used, which doesn't depend on fastutil -->
<exclusion>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index fc9cbeaec6..fadf6a4d8b 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -16,7 +16,6 @@
*/
import com.typesafe.tools.mima.core._
-import com.typesafe.tools.mima.core.ProblemFilters._
/**
* Additional excludes for checking of Spark's binary compatibility.
@@ -35,8 +34,27 @@ object MimaExcludes {
val excludes =
SparkBuild.SPARK_VERSION match {
case v if v.startsWith("1.1") =>
+ Seq(MimaBuild.excludeSparkPackage("graphx")) ++
Seq(
- MimaBuild.excludeSparkPackage("graphx"))
+ // We made a mistake earlier (ed06500d3) in the Java API to use default parameter values
+ // for countApproxDistinct* functions, which does not work in Java. We later removed
+ // them, and use the following to tell Mima to not care about them.
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.JavaPairRDD.countApproxDistinct$default$1"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey$default$1"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.JavaRDD.countApproxDistinct$default$1"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.JavaRDDLike.countApproxDistinct$default$1"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.JavaDoubleRDD.countApproxDistinct$default$1")
+ ) ++
+ MimaBuild.excludeSparkClass("util.SerializableHyperLogLog")
case v if v.startsWith("1.0") =>
Seq(
MimaBuild.excludeSparkPackage("api.java"),
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index c2a20d86b2..efb0b9319b 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -362,7 +362,7 @@ object SparkBuild extends Build {
"com.twitter" %% "chill" % chillVersion excludeAll(excludeAsm),
"com.twitter" % "chill-java" % chillVersion excludeAll(excludeAsm),
"org.tachyonproject" % "tachyon" % "0.4.1-thrift" excludeAll(excludeHadoop, excludeCurator, excludeEclipseJetty, excludePowermock),
- "com.clearspring.analytics" % "stream" % "2.5.1" excludeAll(excludeFastutil),
+ "com.clearspring.analytics" % "stream" % "2.7.0" excludeAll(excludeFastutil), // Only HyperLogLogPlus is used, which does not depend on fastutil.
"org.spark-project" % "pyrolite" % "2.0.1",
"net.sf.py4j" % "py4j" % "0.8.1"
),