aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorHossein Falaki <falaki@gmail.com>2013-10-17 23:39:20 -0700
committerHossein Falaki <falaki@gmail.com>2013-10-17 23:39:20 -0700
commit79868fe7246d8e6d57e0a376b2593fabea9a9d83 (patch)
treedbec9cf4160bd1e442714becefa592684d7a61f7 /core
parentb611d9a65c0eda8ca7ceb015773ea4a4e26f2640 (diff)
downloadspark-79868fe7246d8e6d57e0a376b2593fabea9a9d83.tar.gz
spark-79868fe7246d8e6d57e0a376b2593fabea9a9d83.tar.bz2
spark-79868fe7246d8e6d57e0a376b2593fabea9a9d83.zip
Improved code style.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala18
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala12
4 files changed, 19 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index d778692f45..322b519bd2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -219,7 +219,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
def countDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
val createHLL = (v: V) => {
val hll = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
- val bres = hll.value.offer(v)
+ hll.value.offer(v)
hll
}
val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => {
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 38fa96fd6d..e23e7a63a1 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -784,7 +784,7 @@ abstract class RDD[T: ClassManifest](
}
Iterator(hllCounter)
}
- def mergeCounters(c1: SerializableHyperLogLog, c2: SerializableHyperLogLog): SerializableHyperLogLog = c1.merge(c2)
+ def mergeCounters(c1: SerializableHyperLogLog, c2: SerializableHyperLogLog) = c1.merge(c2)
mapPartitions(hllCountPartition).reduce(mergeCounters).value.cardinality()
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index d81bc8cb4c..5683ada78a 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.rdd
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashSet
+import scala.util.Random
import org.scalatest.FunSuite
@@ -110,15 +111,17 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
}
test("countDistinctByKey") {
- def error(est: Long, size: Long) = math.abs(est - size)/size.toDouble
+ def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble
/* Since HyperLogLog unique counting is approximate, and the relative standard deviation is
- only a statistical bound, the tests can fail for large values of relativeSD. We will be using
- relatively tight error bounds to check correctness of functionality rather than checking
- whether the approximation conforms with the requested bound.
+ * only a statistical bound, the tests can fail for large values of relativeSD. We will be using
+ * relatively tight error bounds to check correctness of functionality rather than checking
+ * whether the approximation conforms with the requested bound.
*/
val relativeSD = 0.001
+ // For each value i, there are i tuples with first element equal to i.
+ // Therefore, the expected count for key i would be i.
val stacked = (1 to 100).flatMap(i => (1 to i).map(j => (i, j)))
val rdd1 = sc.parallelize(stacked)
val counted1 = rdd1.countDistinctByKey(relativeSD).collect()
@@ -126,10 +129,11 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
case(k, count) => assert(math.abs(error(count, k)) < relativeSD)
}
- import scala.util.Random
val rnd = new Random()
- val randStacked = (1 to 100).flatMap{i =>
- val num = rnd.nextInt%500
+
+ // The expected count for key num would be num
+ val randStacked = (1 to 100).flatMap { i =>
+ val num = rnd.nextInt % 500
(1 to num).map(j => (num, j))
}
val rdd2 = sc.parallelize(randStacked)
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 6baf9c7ece..413ea85322 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -63,17 +63,17 @@ class RDDSuite extends FunSuite with SharedSparkContext {
}
}
- test("Approximate distinct count") {
+ test("countDistinct") {
- def error(est: Long, size: Long) = math.abs(est - size)/size.toDouble
+ def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble
val size = 100
val uniformDistro = for (i <- 1 to 100000) yield i % size
val simpleRdd = sc.makeRDD(uniformDistro)
- assert( error(simpleRdd.countDistinct(0.2), size) < 0.2)
- assert( error(simpleRdd.countDistinct(0.05), size) < 0.05)
- assert( error(simpleRdd.countDistinct(0.01), size) < 0.01)
- assert( error(simpleRdd.countDistinct(0.001), size) < 0.001)
+ assert(error(simpleRdd.countDistinct(0.2), size) < 0.2)
+ assert(error(simpleRdd.countDistinct(0.05), size) < 0.05)
+ assert(error(simpleRdd.countDistinct(0.01), size) < 0.01)
+ assert(error(simpleRdd.countDistinct(0.001), size) < 0.001)
}
test("SparkContext.union") {