aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2014-01-01 13:28:54 -0500
committerMatei Zaharia <matei@databricks.com>2014-01-01 13:28:54 -0500
commit0e5b2adb5c4f2ab1bba275bb3cb2dc2643828f4f (patch)
treeda596b21b4e5cf4aa2cc998a254ca76af7be7637 /core/src/main
parent42bcfb2bb2b532dc12e13d3cfc1b4556bbb2c43c (diff)
parent9a0ff721c9e4c8f52aadfdde6ac2764d3cba9797 (diff)
downloadspark-0e5b2adb5c4f2ab1bba275bb3cb2dc2643828f4f.tar.gz
spark-0e5b2adb5c4f2ab1bba275bb3cb2dc2643828f4f.tar.bz2
spark-0e5b2adb5c4f2ab1bba275bb3cb2dc2643828f4f.zip
Merge remote-tracking branch 'apache/master' into conf2
Conflicts: project/SparkBuild.scala
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala36
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala42
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala50
5 files changed, 154 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 363667fa86..55c87450ac 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -611,6 +611,42 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
* Return an RDD with the values of each tuple.
*/
def values(): JavaRDD[V] = JavaRDD.fromRDD[V](rdd.map(_._2))
+
+ /**
+ * Return approximate number of distinct values for each key in this RDD.
+ * The accuracy of approximation can be controlled through the relative standard deviation
+ * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+ * more accurate counts but increase the memory footprint and vise versa. Uses the provided
+ * Partitioner to partition the output RDD.
+ */
+ def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaRDD[(K, Long)] = {
+ rdd.countApproxDistinctByKey(relativeSD, partitioner)
+ }
+
+ /**
+ * Return approximate number of distinct values for each key this RDD.
+ * The accuracy of approximation can be controlled through the relative standard deviation
+ * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+ * more accurate counts but increase the memory footprint and vise versa. The default value of
+ * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
+ * level.
+ */
+ def countApproxDistinctByKey(relativeSD: Double = 0.05): JavaRDD[(K, Long)] = {
+ rdd.countApproxDistinctByKey(relativeSD)
+ }
+
+
+ /**
+ * Return approximate number of distinct values for each key in this RDD.
+ * The accuracy of approximation can be controlled through the relative standard deviation
+ * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+ * more accurate counts but increase the memory footprint and vise versa. HashPartitions the
+ * output RDD into numPartitions.
+ *
+ */
+ def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = {
+ rdd.countApproxDistinctByKey(relativeSD, numPartitions)
+ }
}
object JavaPairRDD {
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index f344804b4c..924d8af060 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -444,4 +444,15 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[T]]
takeOrdered(num, comp)
}
+
+ /**
+ * Return approximate number of distinct elements in the RDD.
+ *
+ * The accuracy of approximation can be controlled through the relative standard deviation
+ * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+ * more accurate counts but increase the memory footprint and vise versa. The default value of
+ * relativeSD is 0.05.
+ */
+ def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD)
+
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 48168e152e..04a8d05988 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -40,12 +40,15 @@ import org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil
import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob}
import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter}
+import com.clearspring.analytics.stream.cardinality.HyperLogLog
+
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.Aggregator
import org.apache.spark.Partitioner
import org.apache.spark.Partitioner.defaultPartitioner
+import org.apache.spark.util.SerializableHyperLogLog
/**
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
@@ -208,6 +211,45 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
}
/**
+ * Return approximate number of distinct values for each key in this RDD.
+ * The accuracy of approximation can be controlled through the relative standard deviation
+ * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+ * more accurate counts but increase the memory footprint and vise versa. Uses the provided
+ * Partitioner to partition the output RDD.
+ */
+ def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
+ val createHLL = (v: V) => new SerializableHyperLogLog(new HyperLogLog(relativeSD)).add(v)
+ val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => hll.add(v)
+ val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2)
+
+ combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.value.cardinality())
+ }
+
+ /**
+ * Return approximate number of distinct values for each key in this RDD.
+ * The accuracy of approximation can be controlled through the relative standard deviation
+ * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+ * more accurate counts but increase the memory footprint and vise versa. HashPartitions the
+ * output RDD into numPartitions.
+ *
+ */
+ def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = {
+ countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions))
+ }
+
+ /**
+ * Return approximate number of distinct values for each key this RDD.
+ * The accuracy of approximation can be controlled through the relative standard deviation
+ * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+ * more accurate counts but increase the memory footprint and vise versa. The default value of
+ * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
+ * level.
+ */
+ def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = {
+ countApproxDistinctByKey(relativeSD, defaultPartitioner(self))
+ }
+
+ /**
* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a
* "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index f8b1a6932e..6a7b0f8a86 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -33,6 +33,7 @@ import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.TextOutputFormat
import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
+import com.clearspring.analytics.stream.cardinality.HyperLogLog
import org.apache.spark.Partitioner._
import org.apache.spark.api.java.JavaRDD
@@ -41,7 +42,7 @@ import org.apache.spark.partial.CountEvaluator
import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{Utils, BoundedPriorityQueue}
+import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableHyperLogLog}
import org.apache.spark.SparkContext._
import org.apache.spark._
@@ -790,6 +791,19 @@ abstract class RDD[T: ClassTag](
}
/**
+ * Return approximate number of distinct elements in the RDD.
+ *
+ * The accuracy of approximation can be controlled through the relative standard deviation
+ * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+ * more accurate counts but increase the memory footprint and vise versa. The default value of
+ * relativeSD is 0.05.
+ */
+ def countApproxDistinct(relativeSD: Double = 0.05): Long = {
+ val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
+ aggregate(zeroCounter)(_.add(_), _.merge(_)).value.cardinality()
+ }
+
+ /**
* Take the first num elements of the RDD. It works by first scanning one partition, and use the
* results from that partition to estimate the number of additional partitions needed to satisfy
* the limit.
diff --git a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
new file mode 100644
index 0000000000..8b4e7c104c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.{Externalizable, ObjectOutput, ObjectInput}
+import com.clearspring.analytics.stream.cardinality.{ICardinality, HyperLogLog}
+
+/**
+ * A wrapper around [[com.clearspring.analytics.stream.cardinality.HyperLogLog]] that is serializable.
+ */
+private[spark]
+class SerializableHyperLogLog(var value: ICardinality) extends Externalizable {
+
+ def this() = this(null) // For deserialization
+
+ def merge(other: SerializableHyperLogLog) = new SerializableHyperLogLog(value.merge(other.value))
+
+ def add[T](elem: T) = {
+ this.value.offer(elem)
+ this
+ }
+
+ def readExternal(in: ObjectInput) {
+ val byteLength = in.readInt()
+ val bytes = new Array[Byte](byteLength)
+ in.readFully(bytes)
+ value = HyperLogLog.Builder.build(bytes)
+ }
+
+ def writeExternal(out: ObjectOutput) {
+ val bytes = value.getBytes()
+ out.writeInt(bytes.length)
+ out.write(bytes)
+ }
+}