aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorHossein Falaki <falaki@gmail.com>2013-12-31 15:34:26 -0800
committerHossein Falaki <falaki@gmail.com>2013-12-31 15:34:26 -0800
commitacb0323053d270a377e497e975b2dfe59e2f997c (patch)
treec5224b86caf844423d8963749d01a4189cf3a1aa /core
parentd6cded7155b36880f81544bdf6fc6c20dd52ad7d (diff)
downloadspark-acb0323053d270a377e497e975b2dfe59e2f997c.tar.gz
spark-acb0323053d270a377e497e975b2dfe59e2f997c.tar.bz2
spark-acb0323053d270a377e497e975b2dfe59e2f997c.zip
minor improvements
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala4
2 files changed, 5 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 1dc5f8d2f5..088b298aad 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -229,9 +229,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
}
val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2)
- combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).map {
- case (k, v) => (k, v.value.cardinality())
- }
+ combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.value.cardinality())
+
}
/**
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 74fab48619..161fd067e1 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -809,7 +809,9 @@ abstract class RDD[T: ClassTag](
}
def mergeCounters(c1: SerializableHyperLogLog, c2: SerializableHyperLogLog) = c1.merge(c2)
- mapPartitions(hllCountPartition).reduce(mergeCounters).value.cardinality()
+ val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
+ mapPartitions(hllCountPartition).aggregate(zeroCounter)(mergeCounters, mergeCounters)
+ .value.cardinality()
}
/**