diff options
12 files changed, 457 insertions, 211 deletions
diff --git a/core/pom.xml b/core/pom.xml
index 043f6cf68d..aac0a9d11e 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -17,215 +17,219 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-parent</artifactId>
- <version>0.9.0-incubating-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent</artifactId>
+ <version>0.9.0-incubating-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.10</artifactId>
- <packaging>jar</packaging>
- <name>Spark Project Core</name>
- <url>http://spark.incubator.apache.org/</url>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_2.10</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Project Core</name>
+ <url>http://spark.incubator.apache.org/</url>
- <dependencies>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- </dependency>
- <dependency>
- <groupId>net.java.dev.jets3t</groupId>
- <artifactId>jets3t</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-ipc</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </dependency>
- <dependency>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-server</artifactId>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
- <dependency>
- <groupId>com.google.code.findbugs</groupId>
- <artifactId>jsr305</artifactId>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
- <dependency>
- <groupId>com.ning</groupId>
- <artifactId>compress-lzf</artifactId>
- </dependency>
- <dependency>
- <groupId>org.xerial.snappy</groupId>
- <artifactId>snappy-java</artifactId>
- </dependency>
- <dependency>
- <groupId>org.ow2.asm</groupId>
- <artifactId>asm</artifactId>
- </dependency>
- <dependency>
- <groupId>com.twitter</groupId>
- <artifactId>chill_${scala.binary.version}</artifactId>
- <version>0.3.1</version>
- </dependency>
- <dependency>
- <groupId>com.twitter</groupId>
- <artifactId>chill-java</artifactId>
- <version>0.3.1</version>
- </dependency>
- <dependency>
- <groupId>${akka.group}</groupId>
- <artifactId>akka-remote_${scala.binary.version}</artifactId>
- </dependency>
- <dependency>
- <groupId>${akka.group}</groupId>
- <artifactId>akka-slf4j_${scala.binary.version}</artifactId>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- </dependency>
- <dependency>
- <groupId>net.liftweb</groupId>
- <artifactId>lift-json_${scala.binary.version}</artifactId>
- </dependency>
- <dependency>
- <groupId>it.unimi.dsi</groupId>
- <artifactId>fastutil</artifactId>
- </dependency>
- <dependency>
- <groupId>colt</groupId>
- <artifactId>colt</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.mesos</groupId>
- <artifactId>mesos</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
- </dependency>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </dependency>
- <dependency>
- <groupId>com.codahale.metrics</groupId>
- <artifactId>metrics-core</artifactId>
- </dependency>
- <dependency>
- <groupId>com.codahale.metrics</groupId>
- <artifactId>metrics-jvm</artifactId>
- </dependency>
- <dependency>
- <groupId>com.codahale.metrics</groupId>
- <artifactId>metrics-json</artifactId>
- </dependency>
- <dependency>
- <groupId>com.codahale.metrics</groupId>
- <artifactId>metrics-ganglia</artifactId>
- </dependency>
- <dependency>
- <groupId>com.codahale.metrics</groupId>
- <artifactId>metrics-graphite</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.derby</groupId>
- <artifactId>derby</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.scalatest</groupId>
- <artifactId>scalatest_${scala.binary.version}</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.scalacheck</groupId>
- <artifactId>scalacheck_${scala.binary.version}</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.easymock</groupId>
- <artifactId>easymock</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.novocode</groupId>
- <artifactId>junit-interface</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
- <build>
- <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
- <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-antrun-plugin</artifactId>
- <executions>
- <execution>
- <phase>test</phase>
- <goals>
- <goal>run</goal>
- </goals>
- <configuration>
- <exportAntProperties>true</exportAntProperties>
- <tasks>
- <property name="spark.classpath" refid="maven.test.classpath" />
- <property environment="env" />
- <fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
- <condition>
- <not>
- <or>
- <isset property="env.SCALA_HOME" />
- <isset property="env.SCALA_LIBRARY_PATH" />
- </or>
- </not>
- </condition>
- </fail>
- </tasks>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.scalatest</groupId>
- <artifactId>scalatest-maven-plugin</artifactId>
- <configuration>
- <environmentVariables>
- <SPARK_HOME>${basedir}/..</SPARK_HOME>
- </environmentVariables>
- </configuration>
- </plugin>
- </plugins>
- </build>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>net.java.dev.jets3t</groupId>
+ <artifactId>jets3t</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-ipc</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.ning</groupId>
+ <artifactId>compress-lzf</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.ow2.asm</groupId>
+ <artifactId>asm</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>chill_${scala.binary.version}</artifactId>
+ <version>0.3.1</version>
+ </dependency>
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>chill-java</artifactId>
+ <version>0.3.1</version>
+ </dependency>
+ <dependency>
+ <groupId>${akka.group}</groupId>
+ <artifactId>akka-remote_${scala.binary.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${akka.group}</groupId>
+ <artifactId>akka-slf4j_${scala.binary.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>net.liftweb</groupId>
+ <artifactId>lift-json_${scala.binary.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>it.unimi.dsi</groupId>
+ <artifactId>fastutil</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>colt</groupId>
+ <artifactId>colt</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.mesos</groupId>
+ <artifactId>mesos</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.clearspring.analytics</groupId>
+ <artifactId>stream</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-jvm</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-json</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-ganglia</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-graphite</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derby</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalacheck</groupId>
+ <artifactId>scalacheck_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.novocode</groupId>
+ <artifactId>junit-interface</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>test</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <exportAntProperties>true</exportAntProperties>
+ <tasks>
+ <property name="spark.classpath" refid="maven.test.classpath" />
+ <property environment="env" />
+ <fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
+ <condition>
+ <not>
+ <or>
+ <isset property="env.SCALA_HOME" />
+ <isset property="env.SCALA_LIBRARY_PATH" />
+ </or>
+ </not>
+ </condition>
+ </fail>
+ </tasks>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ <configuration>
+ <environmentVariables>
+ <SPARK_HOME>${basedir}/..</SPARK_HOME>
+ </environmentVariables>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 363667fa86..55c87450ac 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -611,6 +611,42 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
* Return an RDD with the values of each tuple.
def values(): JavaRDD[V] = JavaRDD.fromRDD[V](rdd.map(_._2))
+ /**
+ * Return approximate number of distinct values for each key in this RDD.
+ * The accuracy of approximation can be controlled through the relative standard deviation
+ * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+ * more accurate counts but increase the memory footprint and vise versa. Uses the provided
+ * Partitioner to partition the output RDD.
+ */
+ def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaRDD[(K, Long)] = {
+ rdd.countApproxDistinctByKey(relativeSD, partitioner)
+ }
+ /**
+ * Return approximate number of distinct values for each key this RDD.
+ * The accuracy of approximation can be controlled through the relative standard deviation
+ * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+ * more accurate counts but increase the memory footprint and vise versa. The default value of
+ * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
+ * level.
+ */
+ def countApproxDistinctByKey(relativeSD: Double = 0.05): JavaRDD[(K, Long)] = {
+ rdd.countApproxDistinctByKey(relativeSD)
+ }
+ /**
+ * Return approximate number of distinct values for each key in this RDD.
+ * The accuracy of approximation can be controlled through the relative standard deviation
+ * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+ * more accurate counts but increase the memory footprint and vise versa. HashPartitions the
+ * output RDD into numPartitions.
+ *
+ */
+ def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = {
+ rdd.countApproxDistinctByKey(relativeSD, numPartitions)
+ }
object JavaPairRDD {
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index f344804b4c..924d8af060 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -444,4 +444,15 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[T]]
takeOrdered(num, comp)
+ /**
+ * Return approximate number of distinct elements in the RDD.
+ *
+ * The accuracy of approximation can be controlled through the relative standard deviation
+ * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+ * more accurate counts but increase the memory footprint and vise versa. The default value of
+ * relativeSD is 0.05.
+ */
+ def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD)
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 48168e152e..04a8d05988 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -40,12 +40,15 @@ import org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil
import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob}
import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter}
+import com.clearspring.analytics.stream.cardinality.HyperLogLog
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.Aggregator
import org.apache.spark.Partitioner
import org.apache.spark.Partitioner.defaultPartitioner
+import org.apache.spark.util.SerializableHyperLogLog
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
@@ -208,6 +211,45 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
+ * Return approximate number of distinct values for each key in this RDD.
+ * The accuracy of approximation can be controlled through the relative standard deviation
+ * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+ * more accurate counts but increase the memory footprint and vise versa. Uses the provided
+ * Partitioner to partition the output RDD.
+ */
+ def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
+ val createHLL = (v: V) => new SerializableHyperLogLog(new HyperLogLog(relativeSD)).add(v)
+ val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => hll.add(v)
+ val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2)
+ combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.value.cardinality())
+ }
+ /**
+ * Return approximate number of distinct values for each key in this RDD.
+ * The accuracy of approximation can be controlled through the relative standard deviation
+ * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+ * more accurate counts but increase the memory footprint and vise versa. HashPartitions the
+ * output RDD into numPartitions.
+ *
+ */
+ def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = {
+ countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions))
+ }
+ /**
+ * Return approximate number of distinct values for each key this RDD.
+ * The accuracy of approximation can be controlled through the relative standard deviation
+ * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+ * more accurate counts but increase the memory footprint and vise versa. The default value of
+ * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
+ * level.
+ */
+ def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = {
+ countApproxDistinctByKey(relativeSD, defaultPartitioner(self))
+ }
+ /**
* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a
* "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index f8b1a6932e..6a7b0f8a86 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -33,6 +33,7 @@ import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.TextOutputFormat
import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
+import com.clearspring.analytics.stream.cardinality.HyperLogLog
import org.apache.spark.Partitioner._
import org.apache.spark.api.java.JavaRDD
@@ -41,7 +42,7 @@ import org.apache.spark.partial.CountEvaluator
import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{Utils, BoundedPriorityQueue}
+import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableHyperLogLog}
import org.apache.spark.SparkContext._
import org.apache.spark._
@@ -790,6 +791,19 @@ abstract class RDD[T: ClassTag](
+ * Return approximate number of distinct elements in the RDD.
+ *
+ * The accuracy of approximation can be controlled through the relative standard deviation
+ * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+ * more accurate counts but increase the memory footprint and vise versa. The default value of
+ * relativeSD is 0.05.
+ */
+ def countApproxDistinct(relativeSD: Double = 0.05): Long = {
+ val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
+ aggregate(zeroCounter)(_.add(_), _.merge(_)).value.cardinality()
+ }
+ /**
* Take the first num elements of the RDD. It works by first scanning one partition, and use the
* results from that partition to estimate the number of additional partitions needed to satisfy
* the limit.
diff --git a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
new file mode 100644
index 0000000000..8b4e7c104c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
@@ -0,0 +1,50 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+import java.io.{Externalizable, ObjectOutput, ObjectInput}
+import com.clearspring.analytics.stream.cardinality.{ICardinality, HyperLogLog}
+ * A wrapper around [[com.clearspring.analytics.stream.cardinality.HyperLogLog]] that is serializable.
+ */
+class SerializableHyperLogLog(var value: ICardinality) extends Externalizable {
+ def this() = this(null) // For deserialization
+ def merge(other: SerializableHyperLogLog) = new SerializableHyperLogLog(value.merge(other.value))
+ def add[T](elem: T) = {
+ this.value.offer(elem)
+ this
+ }
+ def readExternal(in: ObjectInput) {
+ val byteLength = in.readInt()
+ val bytes = new Array[Byte](byteLength)
+ in.readFully(bytes)
+ value = HyperLogLog.Builder.build(bytes)
+ }
+ def writeExternal(out: ObjectOutput) {
+ val bytes = value.getBytes()
+ out.writeInt(bytes.length)
+ out.write(bytes)
+ }
diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
index 5e2899c97b..23ec6c3b31 100644
--- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
@@ -930,4 +930,36 @@ public class JavaAPISuite implements Serializable {
+ @Test
+ public void countApproxDistinct() {
+ List<Integer> arrayData = new ArrayList<Integer>();
+ int size = 100;
+ for (int i = 0; i < 100000; i++) {
+ arrayData.add(i % size);
+ }
+ JavaRDD<Integer> simpleRdd = sc.parallelize(arrayData, 10);
+ Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.2) - size) / (size * 1.0)) < 0.2);
+ Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.05) - size) / (size * 1.0)) <= 0.05);
+ Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.01) - size) / (size * 1.0)) <= 0.01);
+ }
+ @Test
+ public void countApproxDistinctByKey() {
+ double relativeSD = 0.001;
+ List<Tuple2<Integer, Integer>> arrayData = new ArrayList<Tuple2<Integer, Integer>>();
+ for (int i = 10; i < 100; i++)
+ for (int j = 0; j < i; j++)
+ arrayData.add(new Tuple2<Integer, Integer>(i, j));
+ JavaPairRDD<Integer, Integer> pairRdd = sc.parallelizePairs(arrayData);
+ List<Tuple2<Integer, Object>> res = pairRdd.countApproxDistinctByKey(relativeSD).collect();
+ for (Tuple2<Integer, Object> resItem : res) {
+ double count = (double)resItem._1();
+ Long resCount = (Long)resItem._2();
+ Double error = Math.abs((resCount - count) / count);
+ Assert.assertTrue(error < relativeSD);
+ }
+ }
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index 57d3382ed0..5da538a1dd 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.rdd
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashSet
+import scala.util.Random
import org.scalatest.FunSuite
@@ -109,6 +110,39 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
assert(deps.size === 2) // ShuffledRDD, ParallelCollection.
+ test("countApproxDistinctByKey") {
+ def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble
+ /* Since HyperLogLog unique counting is approximate, and the relative standard deviation is
+ * only a statistical bound, the tests can fail for large values of relativeSD. We will be using
+ * relatively tight error bounds to check correctness of functionality rather than checking
+ * whether the approximation conforms with the requested bound.
+ */
+ val relativeSD = 0.001
+ // For each value i, there are i tuples with first element equal to i.
+ // Therefore, the expected count for key i would be i.
+ val stacked = (1 to 100).flatMap(i => (1 to i).map(j => (i, j)))
+ val rdd1 = sc.parallelize(stacked)
+ val counted1 = rdd1.countApproxDistinctByKey(relativeSD).collect()
+ counted1.foreach{
+ case(k, count) => assert(error(count, k) < relativeSD)
+ }
+ val rnd = new Random()
+ // The expected count for key num would be num
+ val randStacked = (1 to 100).flatMap { i =>
+ val num = rnd.nextInt % 500
+ (1 to num).map(j => (num, j))
+ }
+ val rdd2 = sc.parallelize(randStacked)
+ val counted2 = rdd2.countApproxDistinctByKey(relativeSD, 4).collect()
+ counted2.foreach{
+ case(k, count) => assert(error(count, k) < relativeSD)
+ }
+ }
test("join") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index d8dcd6d14c..1383359f85 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -63,6 +63,19 @@ class RDDSuite extends FunSuite with SharedSparkContext {
+ test("countApproxDistinct") {
+ def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble
+ val size = 100
+ val uniformDistro = for (i <- 1 to 100000) yield i % size
+ val simpleRdd = sc.makeRDD(uniformDistro)
+ assert(error(simpleRdd.countApproxDistinct(0.2), size) < 0.2)
+ assert(error(simpleRdd.countApproxDistinct(0.05), size) < 0.05)
+ assert(error(simpleRdd.countApproxDistinct(0.01), size) < 0.01)
+ assert(error(simpleRdd.countApproxDistinct(0.001), size) < 0.001)
+ }
test("SparkContext.union") {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
assert(sc.union(nums).collect().toList === List(1, 2, 3, 4))
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index d23e01418b..3898583275 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -173,6 +173,10 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
assert (sc.parallelize( Array((1, 11), (2, 22), (3, 33)) ).collect().head === (1, 11))
+ test("kryo with SerializableHyperLogLog") {
+ assert(sc.parallelize( Array(1, 2, 3, 2, 3, 3, 2, 3, 1) ).countApproxDistinct(0.01) === 3)
+ }
test("kryo with reduce") {
val control = 1 :: 2 :: Nil
val result = sc.parallelize(control, 2).map(new ClassWithoutNoArgConstructor(_))
diff --git a/pom.xml b/pom.xml
index 0936ae53b4..3a8eb882cc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -200,6 +200,11 @@
+ <dependency>
+ <groupId>com.clearspring.analytics</groupId>
+ <artifactId>stream</artifactId>
+ <version>2.4.0</version>
+ </dependency>
<!-- In theory we need not directly depend on protobuf since Spark does not directly
use it. However, when building with Hadoop/YARN 2.2 Maven doesn't correctly bump
the protobuf version up from the one Mesos gives. For now we include this variable
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 204662f74f..2eef2dfa5e 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -248,7 +248,8 @@ object SparkBuild extends Build {
"com.codahale.metrics" % "metrics-graphite" % "3.0.0",
"com.twitter" %% "chill" % "0.3.1",
"com.twitter" % "chill-java" % "0.3.1",
- "com.typesafe" % "config" % "1.0.2"
+ "com.typesafe" % "config" % "1.0.2",
+ "com.clearspring.analytics" % "stream" % "2.5.1"