diff options
-rw-r--r-- | core/pom.xml | 422 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala | 36 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala | 11 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 42 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/RDD.scala | 16 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala | 50 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/JavaAPISuite.java | 32 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala | 34 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala | 13 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala | 4 | ||||
-rw-r--r-- | pom.xml | 5 | ||||
-rw-r--r-- | project/SparkBuild.scala | 3 |
12 files changed, 457 insertions, 211 deletions
diff --git a/core/pom.xml b/core/pom.xml index 043f6cf68d..aac0a9d11e 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -17,215 +17,219 @@ --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.spark</groupId> - <artifactId>spark-parent</artifactId> - <version>0.9.0-incubating-SNAPSHOT</version> - <relativePath>../pom.xml</relativePath> - </parent> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.spark</groupId> + <artifactId>spark-parent</artifactId> + <version>0.9.0-incubating-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core_2.10</artifactId> - <packaging>jar</packaging> - <name>Spark Project Core</name> - <url>http://spark.incubator.apache.org/</url> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_2.10</artifactId> + <packaging>jar</packaging> + <name>Spark Project Core</name> + <url>http://spark.incubator.apache.org/</url> - <dependencies> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - </dependency> - <dependency> - <groupId>net.java.dev.jets3t</groupId> - <artifactId>jets3t</artifactId> - </dependency> - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> - </dependency> - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro-ipc</artifactId> - </dependency> - <dependency> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - </dependency> - <dependency> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-server</artifactId> - </dependency> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </dependency> - <dependency> - <groupId>com.google.code.findbugs</groupId> - <artifactId>jsr305</artifactId> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </dependency> - <dependency> - <groupId>com.ning</groupId> - <artifactId>compress-lzf</artifactId> - </dependency> - <dependency> - <groupId>org.xerial.snappy</groupId> - <artifactId>snappy-java</artifactId> - </dependency> - <dependency> - <groupId>org.ow2.asm</groupId> - <artifactId>asm</artifactId> - </dependency> - <dependency> - <groupId>com.twitter</groupId> - <artifactId>chill_${scala.binary.version}</artifactId> - <version>0.3.1</version> - </dependency> - <dependency> - <groupId>com.twitter</groupId> - <artifactId>chill-java</artifactId> - <version>0.3.1</version> - </dependency> - <dependency> - <groupId>${akka.group}</groupId> - <artifactId>akka-remote_${scala.binary.version}</artifactId> - </dependency> - <dependency> - <groupId>${akka.group}</groupId> - <artifactId>akka-slf4j_${scala.binary.version}</artifactId> - </dependency> - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - </dependency> - <dependency> - <groupId>net.liftweb</groupId> - <artifactId>lift-json_${scala.binary.version}</artifactId> - </dependency> - <dependency> - <groupId>it.unimi.dsi</groupId> - <artifactId>fastutil</artifactId> - </dependency> - <dependency> - <groupId>colt</groupId> - <artifactId>colt</artifactId> - </dependency> - <dependency> - <groupId>org.apache.mesos</groupId> - <artifactId>mesos</artifactId> - </dependency> - <dependency> - <groupId>io.netty</groupId> - <artifactId>netty-all</artifactId> - </dependency> - <dependency> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </dependency> - <dependency> - <groupId>com.codahale.metrics</groupId> - <artifactId>metrics-core</artifactId> - </dependency> - <dependency> - <groupId>com.codahale.metrics</groupId> - <artifactId>metrics-jvm</artifactId> - </dependency> - <dependency> - <groupId>com.codahale.metrics</groupId> - <artifactId>metrics-json</artifactId> - </dependency> - <dependency> - <groupId>com.codahale.metrics</groupId> - <artifactId>metrics-ganglia</artifactId> - </dependency> - <dependency> - <groupId>com.codahale.metrics</groupId> - <artifactId>metrics-graphite</artifactId> - </dependency> - <dependency> - <groupId>org.apache.derby</groupId> - <artifactId>derby</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.scalatest</groupId> - <artifactId>scalatest_${scala.binary.version}</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.scalacheck</groupId> - <artifactId>scalacheck_${scala.binary.version}</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.easymock</groupId> - <artifactId>easymock</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>com.novocode</groupId> - <artifactId>junit-interface</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - <scope>test</scope> - </dependency> - </dependencies> - <build> - <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> - <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-antrun-plugin</artifactId> - <executions> - <execution> - <phase>test</phase> - <goals> - <goal>run</goal> - </goals> - <configuration> - <exportAntProperties>true</exportAntProperties> - <tasks> - <property name="spark.classpath" refid="maven.test.classpath" /> - <property environment="env" /> - <fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry."> - <condition> - <not> - <or> - <isset property="env.SCALA_HOME" /> - <isset property="env.SCALA_LIBRARY_PATH" /> - </or> - </not> - </condition> - </fail> - </tasks> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.scalatest</groupId> - <artifactId>scalatest-maven-plugin</artifactId> - <configuration> - <environmentVariables> - <SPARK_HOME>${basedir}/..</SPARK_HOME> - <SPARK_TESTING>1</SPARK_TESTING> - <SPARK_CLASSPATH>${spark.classpath}</SPARK_CLASSPATH> - </environmentVariables> - </configuration> - </plugin> - </plugins> - </build> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </dependency> + <dependency> + <groupId>net.java.dev.jets3t</groupId> + <artifactId>jets3t</artifactId> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro-ipc</artifactId> + </dependency> + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>com.ning</groupId> + <artifactId>compress-lzf</artifactId> + </dependency> + <dependency> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + </dependency> + <dependency> + <groupId>org.ow2.asm</groupId> + <artifactId>asm</artifactId> + </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>chill_${scala.binary.version}</artifactId> + <version>0.3.1</version> + </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>chill-java</artifactId> + <version>0.3.1</version> + </dependency> + <dependency> + <groupId>${akka.group}</groupId> + <artifactId>akka-remote_${scala.binary.version}</artifactId> + </dependency> + <dependency> + <groupId>${akka.group}</groupId> + <artifactId>akka-slf4j_${scala.binary.version}</artifactId> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + </dependency> + <dependency> + <groupId>net.liftweb</groupId> + <artifactId>lift-json_${scala.binary.version}</artifactId> + </dependency> + <dependency> + <groupId>it.unimi.dsi</groupId> + <artifactId>fastutil</artifactId> + </dependency> + <dependency> + <groupId>colt</groupId> + <artifactId>colt</artifactId> + </dependency> + <dependency> + <groupId>org.apache.mesos</groupId> + <artifactId>mesos</artifactId> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-all</artifactId> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </dependency> + <dependency> + <groupId>com.clearspring.analytics</groupId> + <artifactId>stream</artifactId> + </dependency> + <dependency> + <groupId>com.codahale.metrics</groupId> + <artifactId>metrics-core</artifactId> + </dependency> + <dependency> + <groupId>com.codahale.metrics</groupId> + <artifactId>metrics-jvm</artifactId> + </dependency> + <dependency> + <groupId>com.codahale.metrics</groupId> + <artifactId>metrics-json</artifactId> + </dependency> + <dependency> + <groupId>com.codahale.metrics</groupId> + <artifactId>metrics-ganglia</artifactId> + </dependency> + <dependency> + <groupId>com.codahale.metrics</groupId> + <artifactId>metrics-graphite</artifactId> + </dependency> + <dependency> + <groupId>org.apache.derby</groupId> + <artifactId>derby</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.binary.version}</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.scalacheck</groupId> + <artifactId>scalacheck_${scala.binary.version}</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.easymock</groupId> + <artifactId>easymock</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.novocode</groupId> + <artifactId>junit-interface</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> + <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <execution> + <phase>test</phase> + <goals> + <goal>run</goal> + </goals> + <configuration> + <exportAntProperties>true</exportAntProperties> + <tasks> + <property name="spark.classpath" refid="maven.test.classpath" /> + <property environment="env" /> + <fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry."> + <condition> + <not> + <or> + <isset property="env.SCALA_HOME" /> + <isset property="env.SCALA_LIBRARY_PATH" /> + </or> + </not> + </condition> + </fail> + </tasks> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.scalatest</groupId> + <artifactId>scalatest-maven-plugin</artifactId> + <configuration> + <environmentVariables> + <SPARK_HOME>${basedir}/..</SPARK_HOME> + <SPARK_TESTING>1</SPARK_TESTING> + <SPARK_CLASSPATH>${spark.classpath}</SPARK_CLASSPATH> + </environmentVariables> + </configuration> + </plugin> + </plugins> + </build> </project> diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 363667fa86..55c87450ac 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -611,6 +611,42 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K * Return an RDD with the values of each tuple. */ def values(): JavaRDD[V] = JavaRDD.fromRDD[V](rdd.map(_._2)) + + /** + * Return approximate number of distinct values for each key in this RDD. + * The accuracy of approximation can be controlled through the relative standard deviation + * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in + * more accurate counts but increase the memory footprint and vise versa. Uses the provided + * Partitioner to partition the output RDD. + */ + def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaRDD[(K, Long)] = { + rdd.countApproxDistinctByKey(relativeSD, partitioner) + } + + /** + * Return approximate number of distinct values for each key this RDD. + * The accuracy of approximation can be controlled through the relative standard deviation + * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in + * more accurate counts but increase the memory footprint and vise versa. The default value of + * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism + * level. + */ + def countApproxDistinctByKey(relativeSD: Double = 0.05): JavaRDD[(K, Long)] = { + rdd.countApproxDistinctByKey(relativeSD) + } + + + /** + * Return approximate number of distinct values for each key in this RDD. + * The accuracy of approximation can be controlled through the relative standard deviation + * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in + * more accurate counts but increase the memory footprint and vise versa. HashPartitions the + * output RDD into numPartitions. + * + */ + def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = { + rdd.countApproxDistinctByKey(relativeSD, numPartitions) + } } object JavaPairRDD { diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index f344804b4c..924d8af060 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -444,4 +444,15 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[T]] takeOrdered(num, comp) } + + /** + * Return approximate number of distinct elements in the RDD. + * + * The accuracy of approximation can be controlled through the relative standard deviation + * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in + * more accurate counts but increase the memory footprint and vise versa. The default value of + * relativeSD is 0.05. + */ + def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD) + } diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 48168e152e..04a8d05988 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -40,12 +40,15 @@ import org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob} import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter} +import com.clearspring.analytics.stream.cardinality.HyperLogLog + import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.Aggregator import org.apache.spark.Partitioner import org.apache.spark.Partitioner.defaultPartitioner +import org.apache.spark.util.SerializableHyperLogLog /** * Extra functions available on RDDs of (key, value) pairs through an implicit conversion. @@ -208,6 +211,45 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) } /** + * Return approximate number of distinct values for each key in this RDD. + * The accuracy of approximation can be controlled through the relative standard deviation + * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in + * more accurate counts but increase the memory footprint and vise versa. Uses the provided + * Partitioner to partition the output RDD. + */ + def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { + val createHLL = (v: V) => new SerializableHyperLogLog(new HyperLogLog(relativeSD)).add(v) + val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => hll.add(v) + val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2) + + combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.value.cardinality()) + } + + /** + * Return approximate number of distinct values for each key in this RDD. + * The accuracy of approximation can be controlled through the relative standard deviation + * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in + * more accurate counts but increase the memory footprint and vise versa. HashPartitions the + * output RDD into numPartitions. + * + */ + def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = { + countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions)) + } + + /** + * Return approximate number of distinct values for each key this RDD. + * The accuracy of approximation can be controlled through the relative standard deviation + * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in + * more accurate counts but increase the memory footprint and vise versa. The default value of + * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism + * level. + */ + def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = { + countApproxDistinctByKey(relativeSD, defaultPartitioner(self)) + } + + /** * Merge the values for each key using an associative reduce function. This will also perform * the merging locally on each mapper before sending results to a reducer, similarly to a * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions. diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index f8b1a6932e..6a7b0f8a86 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -33,6 +33,7 @@ import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.TextOutputFormat import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} +import com.clearspring.analytics.stream.cardinality.HyperLogLog import org.apache.spark.Partitioner._ import org.apache.spark.api.java.JavaRDD @@ -41,7 +42,7 @@ import org.apache.spark.partial.CountEvaluator import org.apache.spark.partial.GroupedCountEvaluator import org.apache.spark.partial.PartialResult import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.{Utils, BoundedPriorityQueue} +import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableHyperLogLog} import org.apache.spark.SparkContext._ import org.apache.spark._ @@ -790,6 +791,19 @@ abstract class RDD[T: ClassTag]( } /** + * Return approximate number of distinct elements in the RDD. + * + * The accuracy of approximation can be controlled through the relative standard deviation + * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in + * more accurate counts but increase the memory footprint and vise versa. The default value of + * relativeSD is 0.05. + */ + def countApproxDistinct(relativeSD: Double = 0.05): Long = { + val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD)) + aggregate(zeroCounter)(_.add(_), _.merge(_)).value.cardinality() + } + + /** * Take the first num elements of the RDD. It works by first scanning one partition, and use the * results from that partition to estimate the number of additional partitions needed to satisfy * the limit. diff --git a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala new file mode 100644 index 0000000000..8b4e7c104c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.io.{Externalizable, ObjectOutput, ObjectInput} +import com.clearspring.analytics.stream.cardinality.{ICardinality, HyperLogLog} + +/** + * A wrapper around [[com.clearspring.analytics.stream.cardinality.HyperLogLog]] that is serializable. + */ +private[spark] +class SerializableHyperLogLog(var value: ICardinality) extends Externalizable { + + def this() = this(null) // For deserialization + + def merge(other: SerializableHyperLogLog) = new SerializableHyperLogLog(value.merge(other.value)) + + def add[T](elem: T) = { + this.value.offer(elem) + this + } + + def readExternal(in: ObjectInput) { + val byteLength = in.readInt() + val bytes = new Array[Byte](byteLength) + in.readFully(bytes) + value = HyperLogLog.Builder.build(bytes) + } + + def writeExternal(out: ObjectOutput) { + val bytes = value.getBytes() + out.writeInt(bytes.length) + out.write(bytes) + } +} diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java index 5e2899c97b..23ec6c3b31 100644 --- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java @@ -930,4 +930,36 @@ public class JavaAPISuite implements Serializable { parts[1]); } + @Test + public void countApproxDistinct() { + List<Integer> arrayData = new ArrayList<Integer>(); + int size = 100; + for (int i = 0; i < 100000; i++) { + arrayData.add(i % size); + } + JavaRDD<Integer> simpleRdd = sc.parallelize(arrayData, 10); + Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.2) - size) / (size * 1.0)) < 0.2); + Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.05) - size) / (size * 1.0)) <= 0.05); + Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.01) - size) / (size * 1.0)) <= 0.01); + } + + @Test + public void countApproxDistinctByKey() { + double relativeSD = 0.001; + + List<Tuple2<Integer, Integer>> arrayData = new ArrayList<Tuple2<Integer, Integer>>(); + for (int i = 10; i < 100; i++) + for (int j = 0; j < i; j++) + arrayData.add(new Tuple2<Integer, Integer>(i, j)); + + JavaPairRDD<Integer, Integer> pairRdd = sc.parallelizePairs(arrayData); + List<Tuple2<Integer, Object>> res = pairRdd.countApproxDistinctByKey(relativeSD).collect(); + for (Tuple2<Integer, Object> resItem : res) { + double count = (double)resItem._1(); + Long resCount = (Long)resItem._2(); + Double error = Math.abs((resCount - count) / count); + Assert.assertTrue(error < relativeSD); + } + + } } diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 57d3382ed0..5da538a1dd 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.rdd import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashSet +import scala.util.Random import org.scalatest.FunSuite @@ -109,6 +110,39 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { assert(deps.size === 2) // ShuffledRDD, ParallelCollection. } + test("countApproxDistinctByKey") { + def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble + + /* Since HyperLogLog unique counting is approximate, and the relative standard deviation is + * only a statistical bound, the tests can fail for large values of relativeSD. We will be using + * relatively tight error bounds to check correctness of functionality rather than checking + * whether the approximation conforms with the requested bound. + */ + val relativeSD = 0.001 + + // For each value i, there are i tuples with first element equal to i. + // Therefore, the expected count for key i would be i. + val stacked = (1 to 100).flatMap(i => (1 to i).map(j => (i, j))) + val rdd1 = sc.parallelize(stacked) + val counted1 = rdd1.countApproxDistinctByKey(relativeSD).collect() + counted1.foreach{ + case(k, count) => assert(error(count, k) < relativeSD) + } + + val rnd = new Random() + + // The expected count for key num would be num + val randStacked = (1 to 100).flatMap { i => + val num = rnd.nextInt % 500 + (1 to num).map(j => (num, j)) + } + val rdd2 = sc.parallelize(randStacked) + val counted2 = rdd2.countApproxDistinctByKey(relativeSD, 4).collect() + counted2.foreach{ + case(k, count) => assert(error(count, k) < relativeSD) + } + } + test("join") { val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index d8dcd6d14c..1383359f85 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -63,6 +63,19 @@ class RDDSuite extends FunSuite with SharedSparkContext { } } + test("countApproxDistinct") { + + def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble + + val size = 100 + val uniformDistro = for (i <- 1 to 100000) yield i % size + val simpleRdd = sc.makeRDD(uniformDistro) + assert(error(simpleRdd.countApproxDistinct(0.2), size) < 0.2) + assert(error(simpleRdd.countApproxDistinct(0.05), size) < 0.05) + assert(error(simpleRdd.countApproxDistinct(0.01), size) < 0.01) + assert(error(simpleRdd.countApproxDistinct(0.001), size) < 0.001) + } + test("SparkContext.union") { val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) assert(sc.union(nums).collect().toList === List(1, 2, 3, 4)) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index d23e01418b..3898583275 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -173,6 +173,10 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { assert (sc.parallelize( Array((1, 11), (2, 22), (3, 33)) ).collect().head === (1, 11)) } + test("kryo with SerializableHyperLogLog") { + assert(sc.parallelize( Array(1, 2, 3, 2, 3, 3, 2, 3, 1) ).countApproxDistinct(0.01) === 3) + } + test("kryo with reduce") { val control = 1 :: 2 :: Nil val result = sc.parallelize(control, 2).map(new ClassWithoutNoArgConstructor(_)) @@ -200,6 +200,11 @@ <artifactId>asm</artifactId> <version>4.0</version> </dependency> + <dependency> + <groupId>com.clearspring.analytics</groupId> + <artifactId>stream</artifactId> + <version>2.4.0</version> + </dependency> <!-- In theory we need not directly depend on protobuf since Spark does not directly use it. However, when building with Hadoop/YARN 2.2 Maven doesn't correctly bump the protobuf version up from the one Mesos gives. For now we include this variable diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 204662f74f..2eef2dfa5e 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -248,7 +248,8 @@ object SparkBuild extends Build { "com.codahale.metrics" % "metrics-graphite" % "3.0.0", "com.twitter" %% "chill" % "0.3.1", "com.twitter" % "chill-java" % "0.3.1", - "com.typesafe" % "config" % "1.0.2" + "com.typesafe" % "config" % "1.0.2", + "com.clearspring.analytics" % "stream" % "2.5.1" ) ) |