From 79868fe7246d8e6d57e0a376b2593fabea9a9d83 Mon Sep 17 00:00:00 2001 From: Hossein Falaki Date: Thu, 17 Oct 2013 23:39:20 -0700 Subject: Improved code style. --- .../scala/org/apache/spark/rdd/PairRDDFunctions.scala | 2 +- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- .../org/apache/spark/rdd/PairRDDFunctionsSuite.scala | 18 +++++++++++------- .../src/test/scala/org/apache/spark/rdd/RDDSuite.scala | 12 ++++++------ 4 files changed, 19 insertions(+), 15 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index d778692f45..322b519bd2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -219,7 +219,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) def countDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { val createHLL = (v: V) => { val hll = new SerializableHyperLogLog(new HyperLogLog(relativeSD)) - val bres = hll.value.offer(v) + hll.value.offer(v) hll } val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => { diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 38fa96fd6d..e23e7a63a1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -784,7 +784,7 @@ abstract class RDD[T: ClassManifest]( } Iterator(hllCounter) } - def mergeCounters(c1: SerializableHyperLogLog, c2: SerializableHyperLogLog): SerializableHyperLogLog = c1.merge(c2) + def mergeCounters(c1: SerializableHyperLogLog, c2: SerializableHyperLogLog) = c1.merge(c2) mapPartitions(hllCountPartition).reduce(mergeCounters).value.cardinality() } diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index d81bc8cb4c..5683ada78a 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.rdd import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashSet +import scala.util.Random import org.scalatest.FunSuite @@ -110,15 +111,17 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { } test("countDistinctByKey") { - def error(est: Long, size: Long) = math.abs(est - size)/size.toDouble + def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble /* Since HyperLogLog unique counting is approximate, and the relative standard deviation is - only a statistical bound, the tests can fail for large values of relativeSD. We will be using - relatively tight error bounds to check correctness of functionality rather than checking - whether the approximation conforms with the requested bound. + * only a statistical bound, the tests can fail for large values of relativeSD. We will be using + * relatively tight error bounds to check correctness of functionality rather than checking + * whether the approximation conforms with the requested bound. */ val relativeSD = 0.001 + // For each value i, there are i tuples with first element equal to i. + // Therefore, the expected count for key i would be i. val stacked = (1 to 100).flatMap(i => (1 to i).map(j => (i, j))) val rdd1 = sc.parallelize(stacked) val counted1 = rdd1.countDistinctByKey(relativeSD).collect() @@ -126,10 +129,11 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { case(k, count) => assert(math.abs(error(count, k)) < relativeSD) } - import scala.util.Random val rnd = new Random() - val randStacked = (1 to 100).flatMap{i => - val num = rnd.nextInt%500 + + // The expected count for key num would be num + val randStacked = (1 to 100).flatMap { i => + val num = rnd.nextInt % 500 (1 to num).map(j => (num, j)) } val rdd2 = sc.parallelize(randStacked) diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 6baf9c7ece..413ea85322 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -63,17 +63,17 @@ class RDDSuite extends FunSuite with SharedSparkContext { } } - test("Approximate distinct count") { + test("countDistinct") { - def error(est: Long, size: Long) = math.abs(est - size)/size.toDouble + def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble val size = 100 val uniformDistro = for (i <- 1 to 100000) yield i % size val simpleRdd = sc.makeRDD(uniformDistro) - assert( error(simpleRdd.countDistinct(0.2), size) < 0.2) - assert( error(simpleRdd.countDistinct(0.05), size) < 0.05) - assert( error(simpleRdd.countDistinct(0.01), size) < 0.01) - assert( error(simpleRdd.countDistinct(0.001), size) < 0.001) + assert(error(simpleRdd.countDistinct(0.2), size) < 0.2) + assert(error(simpleRdd.countDistinct(0.05), size) < 0.05) + assert(error(simpleRdd.countDistinct(0.01), size) < 0.01) + assert(error(simpleRdd.countDistinct(0.001), size) < 0.001) } test("SparkContext.union") { -- cgit v1.2.3