aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorHossein Falaki <falaki@gmail.com>2013-12-30 19:28:03 -0800
committerHossein Falaki <falaki@gmail.com>2013-12-30 19:28:03 -0800
commita7de8e9b1c9859f45db4a620dd62a62d472d8396 (patch)
tree6d9fc104d5ac4f4abb91bb43ef2a58bdb13c7bb7 /core
parentd50ccc5ca9f9f0fa6418c88e7fbfb4a87b1a0e68 (diff)
downloadspark-a7de8e9b1c9859f45db4a620dd62a62d472d8396.tar.gz
spark-a7de8e9b1c9859f45db4a620dd62a62d472d8396.tar.bz2
spark-a7de8e9b1c9859f45db4a620dd62a62d472d8396.zip
Renamed countDistinct and countDistinctByKey methods to include Approx
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala10
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala2
5 files changed, 15 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 4e4f860b19..1dc5f8d2f5 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -217,7 +217,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* more accurate counts but increase the memory footprint and vise versa. Uses the provided
* Partitioner to partition the output RDD.
*/
- def countDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
+ def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
val createHLL = (v: V) => {
val hll = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
hll.value.offer(v)
@@ -242,8 +242,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* output RDD into numPartitions.
*
*/
- def countDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = {
- countDistinctByKey(relativeSD, new HashPartitioner(numPartitions))
+ def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = {
+ countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions))
}
/**
@@ -254,8 +254,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
* level.
*/
- def countDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = {
- countDistinctByKey(relativeSD, defaultPartitioner(self))
+ def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = {
+ countApproxDistinctByKey(relativeSD, defaultPartitioner(self))
}
/**
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 136fa45327..74fab48619 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -797,7 +797,7 @@ abstract class RDD[T: ClassTag](
* more accurate counts but increase the memory footprint and vise versa. The default value of
* relativeSD is 0.05.
*/
- def countDistinct(relativeSD: Double = 0.05): Long = {
+ def countApproxDistinct(relativeSD: Double = 0.05): Long = {
def hllCountPartition(iter: Iterator[T]): Iterator[SerializableHyperLogLog] = {
val hllCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index 6ad58b875d..5da538a1dd 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -110,7 +110,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
assert(deps.size === 2) // ShuffledRDD, ParallelCollection.
}
- test("countDistinctByKey") {
+ test("countApproxDistinctByKey") {
def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble
/* Since HyperLogLog unique counting is approximate, and the relative standard deviation is
@@ -124,7 +124,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
// Therefore, the expected count for key i would be i.
val stacked = (1 to 100).flatMap(i => (1 to i).map(j => (i, j)))
val rdd1 = sc.parallelize(stacked)
- val counted1 = rdd1.countDistinctByKey(relativeSD).collect()
+ val counted1 = rdd1.countApproxDistinctByKey(relativeSD).collect()
counted1.foreach{
case(k, count) => assert(error(count, k) < relativeSD)
}
@@ -137,7 +137,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
(1 to num).map(j => (num, j))
}
val rdd2 = sc.parallelize(randStacked)
- val counted2 = rdd2.countDistinctByKey(relativeSD, 4).collect()
+ val counted2 = rdd2.countApproxDistinctByKey(relativeSD, 4).collect()
counted2.foreach{
case(k, count) => assert(error(count, k) < relativeSD)
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 2f81b81797..1383359f85 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -63,17 +63,17 @@ class RDDSuite extends FunSuite with SharedSparkContext {
}
}
- test("countDistinct") {
+ test("countApproxDistinct") {
def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble
val size = 100
val uniformDistro = for (i <- 1 to 100000) yield i % size
val simpleRdd = sc.makeRDD(uniformDistro)
- assert(error(simpleRdd.countDistinct(0.2), size) < 0.2)
- assert(error(simpleRdd.countDistinct(0.05), size) < 0.05)
- assert(error(simpleRdd.countDistinct(0.01), size) < 0.01)
- assert(error(simpleRdd.countDistinct(0.001), size) < 0.001)
+ assert(error(simpleRdd.countApproxDistinct(0.2), size) < 0.2)
+ assert(error(simpleRdd.countApproxDistinct(0.05), size) < 0.05)
+ assert(error(simpleRdd.countApproxDistinct(0.01), size) < 0.01)
+ assert(error(simpleRdd.countApproxDistinct(0.001), size) < 0.001)
}
test("SparkContext.union") {
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index 18529710fe..636e3ab913 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -173,7 +173,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
}
test("kryo with SerializableHyperLogLog") {
- assert(sc.parallelize( Array(1, 2, 3, 2, 3, 3, 2, 3, 1) ).countDistinct(0.01) === 3)
+ assert(sc.parallelize( Array(1, 2, 3, 2, 3, 3, 2, 3, 1) ).countApproxDistinct(0.01) === 3)
}
test("kryo with reduce") {