From 843727af99786a45cf29352b4e05df92c6b3b6b9 Mon Sep 17 00:00:00 2001 From: Hossein Falaki Date: Thu, 17 Oct 2013 22:17:06 -0700 Subject: Added a serializable wrapper for HyperLogLog --- .../spark/util/SerializableHyperLogLog.scala | 44 ++++++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala new file mode 100644 index 0000000000..28a8accb33 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.io.{ObjectOutputStream, ObjectInputStream} +import com.clearspring.analytics.stream.cardinality.{ICardinality, HyperLogLog} + +/** + * A wrapper around com.clearspring.analytics.stream.cardinality.HyperLogLog that is serializable. + */ +private[spark] +class SerializableHyperLogLog(@transient var value: ICardinality) extends Serializable { + + + def merge(other: SerializableHyperLogLog) = new SerializableHyperLogLog(value.merge(other.value)) + + private def readObject(in: ObjectInputStream) { + val byteLength = in.readInt() + val bytes = new Array[Byte](byteLength) + in.readFully(bytes) + value = HyperLogLog.Builder.build(bytes) + } + + private def writeObject(out: ObjectOutputStream) { + val bytes = value.getBytes() + out.writeInt(bytes.length) + out.write(bytes) + } +} -- cgit v1.2.3 From 1a701358c0811c7f270132291e0646fd806e4984 Mon Sep 17 00:00:00 2001 From: Hossein Falaki Date: Thu, 17 Oct 2013 22:24:48 -0700 Subject: Added a countDistinct method to RDD that takes takes an accuracy parameter and returns the (approximate) number of distinct elements in the RDD. --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 26 +++++++++++++++++++++- .../test/scala/org/apache/spark/rdd/RDDSuite.scala | 13 +++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 0355618e43..09932db5ea 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -30,6 +30,7 @@ import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.TextOutputFormat import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} +import com.clearspring.analytics.stream.cardinality.HyperLogLog import org.apache.spark.Partitioner._ import org.apache.spark.api.java.JavaRDD @@ -38,7 +39,7 @@ import org.apache.spark.partial.CountEvaluator import org.apache.spark.partial.GroupedCountEvaluator import org.apache.spark.partial.PartialResult import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.{Utils, BoundedPriorityQueue} +import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableHyperLogLog} import org.apache.spark.SparkContext._ import org.apache.spark._ @@ -765,6 +766,29 @@ abstract class RDD[T: ClassManifest]( sc.runApproximateJob(this, countPartition, evaluator, timeout) } + /** + * Return approximate number of distinct elements in the RDD. + * + * The accuracy of approximation can be controlled through the relative standard diviation + * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in + * more accurate counts but increase the memory footprint and vise versa. The default value of + * relativeSD is 0.05. + */ + def countDistinct(relativeSD: Double = 0.05): Long = { + + def hllCountPartition(iter: Iterator[T]): Iterator[SerializableHyperLogLog] = { + val hllCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD)) + while (iter.hasNext) { + val v = iter.next() + hllCounter.value.offer(v) + } + Iterator(hllCounter) + } + def mergeCounters(c1: SerializableHyperLogLog, c2: SerializableHyperLogLog): SerializableHyperLogLog = c1.merge(c2) + + mapPartitions(hllCountPartition).reduce(mergeCounters).value.cardinality() + } + /** * Take the first num elements of the RDD. It works by first scanning one partition, and use the * results from that partition to estimate the number of additional partitions needed to satisfy diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 6d1bc5e296..6baf9c7ece 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -63,6 +63,19 @@ class RDDSuite extends FunSuite with SharedSparkContext { } } + test("Approximate distinct count") { + + def error(est: Long, size: Long) = math.abs(est - size)/size.toDouble + + val size = 100 + val uniformDistro = for (i <- 1 to 100000) yield i % size + val simpleRdd = sc.makeRDD(uniformDistro) + assert( error(simpleRdd.countDistinct(0.2), size) < 0.2) + assert( error(simpleRdd.countDistinct(0.05), size) < 0.05) + assert( error(simpleRdd.countDistinct(0.01), size) < 0.01) + assert( error(simpleRdd.countDistinct(0.001), size) < 0.001) + } + test("SparkContext.union") { val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) assert(sc.union(nums).collect().toList === List(1, 2, 3, 4)) -- cgit v1.2.3 From ec5df800fdb0109314c0d5cd6dcac2ecbb9433d6 Mon Sep 17 00:00:00 2001 From: Hossein Falaki Date: Thu, 17 Oct 2013 22:26:00 -0700 Subject: Added countDistinctByKey to PairRDDFunctions that counts the approximate number of unique values for each key in the RDD. --- .../org/apache/spark/rdd/PairRDDFunctions.scala | 51 ++++++++++++++++++++++ .../apache/spark/rdd/PairRDDFunctionsSuite.scala | 30 +++++++++++++ 2 files changed, 81 insertions(+) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 93b78e1232..f34593f0b6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -39,12 +39,15 @@ import org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob} import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter} +import com.clearspring.analytics.stream.cardinality.HyperLogLog + import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.Aggregator import org.apache.spark.Partitioner import org.apache.spark.Partitioner.defaultPartitioner +import org.apache.spark.util.SerializableHyperLogLog /** * Extra functions available on RDDs of (key, value) pairs through an implicit conversion. @@ -206,6 +209,54 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) self.map(_._1).countByValueApprox(timeout, confidence) } + /** + * Return approximate number of distinct values for each key in this RDD. + * The accuracy of approximation can be controlled through the relative standard diviation + * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in + * more accurate counts but increase the memory footprint and vise versa. Uses the provided + * Partitioner to partition the output RDD. + */ + def countDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { + val createHLL = (v: V) => { + val hll = new SerializableHyperLogLog(new HyperLogLog(relativeSD)) + val bres = hll.value.offer(v) + hll + } + val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => { + hll.value.offer(v) + hll + } + val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2) + + combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).map { + case (k, v) => (k, v.value.cardinality()) + } + } + + /** + * Return approximate number of distinct values for each key in this RDD. + * The accuracy of approximation can be controlled through the relative standard diviation + * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in + * more accurate counts but increase the memory footprint and vise versa. HashPartitions the + * output RDD into numPartitions. + * + */ + def countDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = { + countDistinctByKey(relativeSD, new HashPartitioner(numPartitions)) + } + + /** + * Return approximate number of distinct values for each key this RDD. + * The accuracy of approximation can be controlled through the relative standard diviation + * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in + * more accurate counts but increase the memory footprint and vise versa. The default value of + * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism + * level. + */ + def countDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = { + countDistinctByKey(relativeSD, defaultPartitioner(self)) + } + /** * Merge the values for each key using an associative reduce function. This will also perform * the merging locally on each mapper before sending results to a reducer, similarly to a diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 57d3382ed0..d81bc8cb4c 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -109,6 +109,36 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { assert(deps.size === 2) // ShuffledRDD, ParallelCollection. } + test("countDistinctByKey") { + def error(est: Long, size: Long) = math.abs(est - size)/size.toDouble + + /* Since HyperLogLog unique counting is approximate, and the relative standard deviation is + only a statistical bound, the tests can fail for large values of relativeSD. We will be using + relatively tight error bounds to check correctness of functionality rather than checking + whether the approximation conforms with the requested bound. + */ + val relativeSD = 0.001 + + val stacked = (1 to 100).flatMap(i => (1 to i).map(j => (i, j))) + val rdd1 = sc.parallelize(stacked) + val counted1 = rdd1.countDistinctByKey(relativeSD).collect() + counted1.foreach{ + case(k, count) => assert(math.abs(error(count, k)) < relativeSD) + } + + import scala.util.Random + val rnd = new Random() + val randStacked = (1 to 100).flatMap{i => + val num = rnd.nextInt%500 + (1 to num).map(j => (num, j)) + } + val rdd2 = sc.parallelize(randStacked) + val counted2 = rdd2.countDistinctByKey(relativeSD, 4).collect() + counted2.foreach{ + case(k, count) => assert(math.abs(error(count, k)) < relativeSD) + } + } + test("join") { val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) -- cgit v1.2.3 From b611d9a65c0eda8ca7ceb015773ea4a4e26f2640 Mon Sep 17 00:00:00 2001 From: Hossein Falaki Date: Thu, 17 Oct 2013 23:05:22 -0700 Subject: Fixed document typo --- core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 6 +++--- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index f34593f0b6..d778692f45 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -211,7 +211,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) /** * Return approximate number of distinct values for each key in this RDD. - * The accuracy of approximation can be controlled through the relative standard diviation + * The accuracy of approximation can be controlled through the relative standard deviation * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in * more accurate counts but increase the memory footprint and vise versa. Uses the provided * Partitioner to partition the output RDD. @@ -235,7 +235,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) /** * Return approximate number of distinct values for each key in this RDD. - * The accuracy of approximation can be controlled through the relative standard diviation + * The accuracy of approximation can be controlled through the relative standard deviation * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in * more accurate counts but increase the memory footprint and vise versa. HashPartitions the * output RDD into numPartitions. @@ -247,7 +247,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) /** * Return approximate number of distinct values for each key this RDD. - * The accuracy of approximation can be controlled through the relative standard diviation + * The accuracy of approximation can be controlled through the relative standard deviation * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in * more accurate counts but increase the memory footprint and vise versa. The default value of * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 09932db5ea..38fa96fd6d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -769,7 +769,7 @@ abstract class RDD[T: ClassManifest]( /** * Return approximate number of distinct elements in the RDD. * - * The accuracy of approximation can be controlled through the relative standard diviation + * The accuracy of approximation can be controlled through the relative standard deviation * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in * more accurate counts but increase the memory footprint and vise versa. The default value of * relativeSD is 0.05. -- cgit v1.2.3 From 79868fe7246d8e6d57e0a376b2593fabea9a9d83 Mon Sep 17 00:00:00 2001 From: Hossein Falaki Date: Thu, 17 Oct 2013 23:39:20 -0700 Subject: Improved code style. --- .../scala/org/apache/spark/rdd/PairRDDFunctions.scala | 2 +- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- .../org/apache/spark/rdd/PairRDDFunctionsSuite.scala | 18 +++++++++++------- .../src/test/scala/org/apache/spark/rdd/RDDSuite.scala | 12 ++++++------ 4 files changed, 19 insertions(+), 15 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index d778692f45..322b519bd2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -219,7 +219,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) def countDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { val createHLL = (v: V) => { val hll = new SerializableHyperLogLog(new HyperLogLog(relativeSD)) - val bres = hll.value.offer(v) + hll.value.offer(v) hll } val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => { diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 38fa96fd6d..e23e7a63a1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -784,7 +784,7 @@ abstract class RDD[T: ClassManifest]( } Iterator(hllCounter) } - def mergeCounters(c1: SerializableHyperLogLog, c2: SerializableHyperLogLog): SerializableHyperLogLog = c1.merge(c2) + def mergeCounters(c1: SerializableHyperLogLog, c2: SerializableHyperLogLog) = c1.merge(c2) mapPartitions(hllCountPartition).reduce(mergeCounters).value.cardinality() } diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index d81bc8cb4c..5683ada78a 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.rdd import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashSet +import scala.util.Random import org.scalatest.FunSuite @@ -110,15 +111,17 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { } test("countDistinctByKey") { - def error(est: Long, size: Long) = math.abs(est - size)/size.toDouble + def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble /* Since HyperLogLog unique counting is approximate, and the relative standard deviation is - only a statistical bound, the tests can fail for large values of relativeSD. We will be using - relatively tight error bounds to check correctness of functionality rather than checking - whether the approximation conforms with the requested bound. + * only a statistical bound, the tests can fail for large values of relativeSD. We will be using + * relatively tight error bounds to check correctness of functionality rather than checking + * whether the approximation conforms with the requested bound. */ val relativeSD = 0.001 + // For each value i, there are i tuples with first element equal to i. + // Therefore, the expected count for key i would be i. val stacked = (1 to 100).flatMap(i => (1 to i).map(j => (i, j))) val rdd1 = sc.parallelize(stacked) val counted1 = rdd1.countDistinctByKey(relativeSD).collect() @@ -126,10 +129,11 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { case(k, count) => assert(math.abs(error(count, k)) < relativeSD) } - import scala.util.Random val rnd = new Random() - val randStacked = (1 to 100).flatMap{i => - val num = rnd.nextInt%500 + + // The expected count for key num would be num + val randStacked = (1 to 100).flatMap { i => + val num = rnd.nextInt % 500 (1 to num).map(j => (num, j)) } val rdd2 = sc.parallelize(randStacked) diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 6baf9c7ece..413ea85322 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -63,17 +63,17 @@ class RDDSuite extends FunSuite with SharedSparkContext { } } - test("Approximate distinct count") { + test("countDistinct") { - def error(est: Long, size: Long) = math.abs(est - size)/size.toDouble + def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble val size = 100 val uniformDistro = for (i <- 1 to 100000) yield i % size val simpleRdd = sc.makeRDD(uniformDistro) - assert( error(simpleRdd.countDistinct(0.2), size) < 0.2) - assert( error(simpleRdd.countDistinct(0.05), size) < 0.05) - assert( error(simpleRdd.countDistinct(0.01), size) < 0.01) - assert( error(simpleRdd.countDistinct(0.001), size) < 0.001) + assert(error(simpleRdd.countDistinct(0.2), size) < 0.2) + assert(error(simpleRdd.countDistinct(0.05), size) < 0.05) + assert(error(simpleRdd.countDistinct(0.01), size) < 0.01) + assert(error(simpleRdd.countDistinct(0.001), size) < 0.001) } test("SparkContext.union") { -- cgit v1.2.3 From 13227aaa28ba7bb29b94a598b6efd45c7264d78b Mon Sep 17 00:00:00 2001 From: Hossein Falaki Date: Fri, 18 Oct 2013 14:10:24 -0700 Subject: Added stream-lib dependency to Maven build --- core/pom.xml | 4 ++++ pom.xml | 5 +++++ 2 files changed, 9 insertions(+) (limited to 'core') diff --git a/core/pom.xml b/core/pom.xml index 8621d257e5..e53875c72d 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -84,6 +84,10 @@ com.google.protobuf protobuf-java + + com.clearspring.analytics + stream + com.twitter chill_2.9.3 diff --git a/pom.xml b/pom.xml index 5ad7b1befb..334958382a 100644 --- a/pom.xml +++ b/pom.xml @@ -231,6 +231,11 @@ asm 4.0 + + com.clearspring.analytics + stream + 2.4.0 + com.google.protobuf protobuf-java -- cgit v1.2.3 From 2d511ab320a85eccafbb9e51a2183b07114bbaa1 Mon Sep 17 00:00:00 2001 From: Hossein Falaki Date: Fri, 18 Oct 2013 15:30:45 -0700 Subject: Made SerializableHyperLogLog Externalizable and added Kryo tests --- .../scala/org/apache/spark/util/SerializableHyperLogLog.scala | 11 ++++++----- .../org/apache/spark/serializer/KryoSerializerSuite.scala | 4 ++++ 2 files changed, 10 insertions(+), 5 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala index 28a8accb33..9cfd41407f 100644 --- a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala +++ b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala @@ -17,26 +17,27 @@ package org.apache.spark.util -import java.io.{ObjectOutputStream, ObjectInputStream} +import java.io.{Externalizable, ObjectOutput, ObjectInput} import com.clearspring.analytics.stream.cardinality.{ICardinality, HyperLogLog} /** - * A wrapper around com.clearspring.analytics.stream.cardinality.HyperLogLog that is serializable. + * A wrapper around [[com.clearspring.analytics.stream.cardinality.HyperLogLog]] that is serializable. */ private[spark] -class SerializableHyperLogLog(@transient var value: ICardinality) extends Serializable { +class SerializableHyperLogLog(var value: ICardinality) extends Externalizable { + def this() = this(null) // For deserialization def merge(other: SerializableHyperLogLog) = new SerializableHyperLogLog(value.merge(other.value)) - private def readObject(in: ObjectInputStream) { + def readExternal(in: ObjectInput) { val byteLength = in.readInt() val bytes = new Array[Byte](byteLength) in.readFully(bytes) value = HyperLogLog.Builder.build(bytes) } - private def writeObject(out: ObjectOutputStream) { + def writeExternal(out: ObjectOutput) { val bytes = value.getBytes() out.writeInt(bytes.length) out.write(bytes) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index c016c51171..18529710fe 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -172,6 +172,10 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { assert (sc.parallelize( Array((1, 11), (2, 22), (3, 33)) ).collect().head === (1, 11)) } + test("kryo with SerializableHyperLogLog") { + assert(sc.parallelize( Array(1, 2, 3, 2, 3, 3, 2, 3, 1) ).countDistinct(0.01) === 3) + } + test("kryo with reduce") { val control = 1 :: 2 :: Nil val result = sc.parallelize(control, 2).map(new ClassWithoutNoArgConstructor(_)) -- cgit v1.2.3 From 49bf47e1b792b82561b164f4f8006ddd4dd350ee Mon Sep 17 00:00:00 2001 From: Hossein Falaki Date: Tue, 10 Dec 2013 19:50:50 -0800 Subject: Removed superfluous abs call from test cases. --- core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'core') diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 5683ada78a..6ad58b875d 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -126,7 +126,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { val rdd1 = sc.parallelize(stacked) val counted1 = rdd1.countDistinctByKey(relativeSD).collect() counted1.foreach{ - case(k, count) => assert(math.abs(error(count, k)) < relativeSD) + case(k, count) => assert(error(count, k) < relativeSD) } val rnd = new Random() @@ -139,7 +139,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { val rdd2 = sc.parallelize(randStacked) val counted2 = rdd2.countDistinctByKey(relativeSD, 4).collect() counted2.foreach{ - case(k, count) => assert(math.abs(error(count, k)) < relativeSD) + case(k, count) => assert(error(count, k) < relativeSD) } } -- cgit v1.2.3 From a7de8e9b1c9859f45db4a620dd62a62d472d8396 Mon Sep 17 00:00:00 2001 From: Hossein Falaki Date: Mon, 30 Dec 2013 19:28:03 -0800 Subject: Renamed countDistinct and countDistinctByKey methods to include Approx --- .../src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 10 +++++----- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- .../scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala | 6 +++--- core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala | 10 +++++----- .../org/apache/spark/serializer/KryoSerializerSuite.scala | 2 +- 5 files changed, 15 insertions(+), 15 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 4e4f860b19..1dc5f8d2f5 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -217,7 +217,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * more accurate counts but increase the memory footprint and vise versa. Uses the provided * Partitioner to partition the output RDD. */ - def countDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { + def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { val createHLL = (v: V) => { val hll = new SerializableHyperLogLog(new HyperLogLog(relativeSD)) hll.value.offer(v) @@ -242,8 +242,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * output RDD into numPartitions. * */ - def countDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = { - countDistinctByKey(relativeSD, new HashPartitioner(numPartitions)) + def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = { + countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions)) } /** @@ -254,8 +254,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism * level. */ - def countDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = { - countDistinctByKey(relativeSD, defaultPartitioner(self)) + def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = { + countApproxDistinctByKey(relativeSD, defaultPartitioner(self)) } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 136fa45327..74fab48619 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -797,7 +797,7 @@ abstract class RDD[T: ClassTag]( * more accurate counts but increase the memory footprint and vise versa. The default value of * relativeSD is 0.05. */ - def countDistinct(relativeSD: Double = 0.05): Long = { + def countApproxDistinct(relativeSD: Double = 0.05): Long = { def hllCountPartition(iter: Iterator[T]): Iterator[SerializableHyperLogLog] = { val hllCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD)) diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 6ad58b875d..5da538a1dd 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -110,7 +110,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { assert(deps.size === 2) // ShuffledRDD, ParallelCollection. } - test("countDistinctByKey") { + test("countApproxDistinctByKey") { def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble /* Since HyperLogLog unique counting is approximate, and the relative standard deviation is @@ -124,7 +124,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { // Therefore, the expected count for key i would be i. val stacked = (1 to 100).flatMap(i => (1 to i).map(j => (i, j))) val rdd1 = sc.parallelize(stacked) - val counted1 = rdd1.countDistinctByKey(relativeSD).collect() + val counted1 = rdd1.countApproxDistinctByKey(relativeSD).collect() counted1.foreach{ case(k, count) => assert(error(count, k) < relativeSD) } @@ -137,7 +137,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { (1 to num).map(j => (num, j)) } val rdd2 = sc.parallelize(randStacked) - val counted2 = rdd2.countDistinctByKey(relativeSD, 4).collect() + val counted2 = rdd2.countApproxDistinctByKey(relativeSD, 4).collect() counted2.foreach{ case(k, count) => assert(error(count, k) < relativeSD) } diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 2f81b81797..1383359f85 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -63,17 +63,17 @@ class RDDSuite extends FunSuite with SharedSparkContext { } } - test("countDistinct") { + test("countApproxDistinct") { def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble val size = 100 val uniformDistro = for (i <- 1 to 100000) yield i % size val simpleRdd = sc.makeRDD(uniformDistro) - assert(error(simpleRdd.countDistinct(0.2), size) < 0.2) - assert(error(simpleRdd.countDistinct(0.05), size) < 0.05) - assert(error(simpleRdd.countDistinct(0.01), size) < 0.01) - assert(error(simpleRdd.countDistinct(0.001), size) < 0.001) + assert(error(simpleRdd.countApproxDistinct(0.2), size) < 0.2) + assert(error(simpleRdd.countApproxDistinct(0.05), size) < 0.05) + assert(error(simpleRdd.countApproxDistinct(0.01), size) < 0.01) + assert(error(simpleRdd.countApproxDistinct(0.001), size) < 0.001) } test("SparkContext.union") { diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 18529710fe..636e3ab913 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -173,7 +173,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { } test("kryo with SerializableHyperLogLog") { - assert(sc.parallelize( Array(1, 2, 3, 2, 3, 3, 2, 3, 1) ).countDistinct(0.01) === 3) + assert(sc.parallelize( Array(1, 2, 3, 2, 3, 3, 2, 3, 1) ).countApproxDistinct(0.01) === 3) } test("kryo with reduce") { -- cgit v1.2.3 From ed06500d300e93ae3129a035a364117adcb7d361 Mon Sep 17 00:00:00 2001 From: Hossein Falaki Date: Mon, 30 Dec 2013 19:30:42 -0800 Subject: Added Java API for countApproxDistinctByKey --- .../org/apache/spark/api/java/JavaPairRDD.scala | 36 ++++++++++++++++++++++ 1 file changed, 36 insertions(+) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 363667fa86..55c87450ac 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -611,6 +611,42 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K * Return an RDD with the values of each tuple. */ def values(): JavaRDD[V] = JavaRDD.fromRDD[V](rdd.map(_._2)) + + /** + * Return approximate number of distinct values for each key in this RDD. + * The accuracy of approximation can be controlled through the relative standard deviation + * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in + * more accurate counts but increase the memory footprint and vise versa. Uses the provided + * Partitioner to partition the output RDD. + */ + def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaRDD[(K, Long)] = { + rdd.countApproxDistinctByKey(relativeSD, partitioner) + } + + /** + * Return approximate number of distinct values for each key this RDD. + * The accuracy of approximation can be controlled through the relative standard deviation + * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in + * more accurate counts but increase the memory footprint and vise versa. The default value of + * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism + * level. + */ + def countApproxDistinctByKey(relativeSD: Double = 0.05): JavaRDD[(K, Long)] = { + rdd.countApproxDistinctByKey(relativeSD) + } + + + /** + * Return approximate number of distinct values for each key in this RDD. + * The accuracy of approximation can be controlled through the relative standard deviation + * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in + * more accurate counts but increase the memory footprint and vise versa. HashPartitions the + * output RDD into numPartitions. + * + */ + def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = { + rdd.countApproxDistinctByKey(relativeSD, numPartitions) + } } object JavaPairRDD { -- cgit v1.2.3 From c3073b6cf2a647451441e8dfc18fe4334497113c Mon Sep 17 00:00:00 2001 From: Hossein Falaki Date: Mon, 30 Dec 2013 19:31:06 -0800 Subject: Added Java API for countApproxDistinct --- .../main/scala/org/apache/spark/api/java/JavaRDDLike.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index f344804b4c..924d8af060 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -444,4 +444,15 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[T]] takeOrdered(num, comp) } + + /** + * Return approximate number of distinct elements in the RDD. + * + * The accuracy of approximation can be controlled through the relative standard deviation + * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in + * more accurate counts but increase the memory footprint and vise versa. The default value of + * relativeSD is 0.05. + */ + def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD) + } -- cgit v1.2.3 From d6cded7155b36880f81544bdf6fc6c20dd52ad7d Mon Sep 17 00:00:00 2001 From: Hossein Falaki Date: Mon, 30 Dec 2013 19:32:05 -0800 Subject: Added Java unit tests for countApproxDistinct and countApproxDistinctByKey --- .../test/scala/org/apache/spark/JavaAPISuite.java | 32 ++++++++++++++++++++++ 1 file changed, 32 insertions(+) (limited to 'core') diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java index 79913dc718..6398feb9f8 100644 --- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java @@ -930,4 +930,36 @@ public class JavaAPISuite implements Serializable { parts[1]); } + @Test + public void countApproxDistinct() { + List arrayData = new ArrayList(); + int size = 100; + for (int i = 0; i < 100000; i++) { + arrayData.add(i % size); + } + JavaRDD simpleRdd = sc.parallelize(arrayData, 10); + Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.2) - size) / (size * 1.0)) < 0.2); + Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.05) - size) / (size * 1.0)) <= 0.05); + Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.01) - size) / (size * 1.0)) <= 0.01); + } + + @Test + public void countApproxDistinctByKey() { + double relativeSD = 0.001; + + List> arrayData = new ArrayList>(); + for (int i = 10; i < 100; i++) + for (int j = 0; j < i; j++) + arrayData.add(new Tuple2(i, j)); + + JavaPairRDD pairRdd = sc.parallelizePairs(arrayData); + List> res = pairRdd.countApproxDistinctByKey(relativeSD).collect(); + for (Tuple2 resItem : res) { + double count = (double)resItem._1(); + Long resCount = (Long)resItem._2(); + Double error = Math.abs((resCount - count) / count); + Assert.assertTrue(error < relativeSD); + } + + } } -- cgit v1.2.3 From acb0323053d270a377e497e975b2dfe59e2f997c Mon Sep 17 00:00:00 2001 From: Hossein Falaki Date: Tue, 31 Dec 2013 15:34:26 -0800 Subject: minor improvements --- core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 5 ++--- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 4 +++- 2 files changed, 5 insertions(+), 4 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 1dc5f8d2f5..088b298aad 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -229,9 +229,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) } val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2) - combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).map { - case (k, v) => (k, v.value.cardinality()) - } + combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.value.cardinality()) + } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 74fab48619..161fd067e1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -809,7 +809,9 @@ abstract class RDD[T: ClassTag]( } def mergeCounters(c1: SerializableHyperLogLog, c2: SerializableHyperLogLog) = c1.merge(c2) - mapPartitions(hllCountPartition).reduce(mergeCounters).value.cardinality() + val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD)) + mapPartitions(hllCountPartition).aggregate(zeroCounter)(mergeCounters, mergeCounters) + .value.cardinality() } /** -- cgit v1.2.3 From bee445c927586136673f39259f23642a5a6e8efe Mon Sep 17 00:00:00 2001 From: Hossein Falaki Date: Tue, 31 Dec 2013 16:58:18 -0800 Subject: Made the code more compact and readable --- .../main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 12 ++---------- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 14 +------------- .../org/apache/spark/util/SerializableHyperLogLog.scala | 5 +++++ 3 files changed, 8 insertions(+), 23 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 088b298aad..04a8d05988 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -218,19 +218,11 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * Partitioner to partition the output RDD. */ def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { - val createHLL = (v: V) => { - val hll = new SerializableHyperLogLog(new HyperLogLog(relativeSD)) - hll.value.offer(v) - hll - } - val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => { - hll.value.offer(v) - hll - } + val createHLL = (v: V) => new SerializableHyperLogLog(new HyperLogLog(relativeSD)).add(v) + val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => hll.add(v) val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2) combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.value.cardinality()) - } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 161fd067e1..4960e6e82f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -798,20 +798,8 @@ abstract class RDD[T: ClassTag]( * relativeSD is 0.05. */ def countApproxDistinct(relativeSD: Double = 0.05): Long = { - - def hllCountPartition(iter: Iterator[T]): Iterator[SerializableHyperLogLog] = { - val hllCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD)) - while (iter.hasNext) { - val v = iter.next() - hllCounter.value.offer(v) - } - Iterator(hllCounter) - } - def mergeCounters(c1: SerializableHyperLogLog, c2: SerializableHyperLogLog) = c1.merge(c2) - val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD)) - mapPartitions(hllCountPartition).aggregate(zeroCounter)(mergeCounters, mergeCounters) - .value.cardinality() + aggregate(zeroCounter)(_.add(_), _.merge(_)).value.cardinality() } /** diff --git a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala index 9cfd41407f..8b4e7c104c 100644 --- a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala +++ b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala @@ -30,6 +30,11 @@ class SerializableHyperLogLog(var value: ICardinality) extends Externalizable { def merge(other: SerializableHyperLogLog) = new SerializableHyperLogLog(value.merge(other.value)) + def add[T](elem: T) = { + this.value.offer(elem) + this + } + def readExternal(in: ObjectInput) { val byteLength = in.readInt() val bytes = new Array[Byte](byteLength) -- cgit v1.2.3 From b5d0b3b0f7fc823d2724464fb8bbf8fef5b36d30 Mon Sep 17 00:00:00 2001 From: liguoqiang Date: Wed, 1 Jan 2014 11:30:08 +0800 Subject: restore core/pom.xml file modification --- core/pom.xml | 1586 +++++++++------------------------------------------------- pom.xml | 10 +- 2 files changed, 240 insertions(+), 1356 deletions(-) (limited to 'core') diff --git a/core/pom.xml b/core/pom.xml index f0248bc539..aac0a9d11e 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -1,1351 +1,235 @@ - - - - - - - - - incubator-spark/core/pom.xml at master · apache/incubator-spark · GitHub - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
- - - - - - - -
-
- - - - - - - -
- - -
- - - - - -
- - This repository - - -
-
- -
- - -
This repository
-
- -
- - -
All repositories
-
- -
-
-
- - - - - - - - -
-
- -
-
- - - - - -
- -
-
- - - - -

- public - - - - - / - incubator-spark - - - Octocat-spinner-32 - - - - mirrored from git://git.apache.org/incubator-spark.git - -

-
-
- -
- - -
- -
- - -
-
- -
- - - -
-
- -
- - - - -
-

HTTPS clone URL

-
- - - -
-
- - - -
-

Subversion checkout URL

-
- - - -
-
- - -

You can clone with - HTTPS, - or Subversion. - - - - - -

- - - - - - Download ZIP - -
-
- -
- - - - - - - - - -
- - -
- - - branch: - master - - -
- -
-
- Switch branches/tags - -
- -
-
- -
-
- -
-
- -
- -
- - -
- - akka-actors -
-
- - arthur -
-
- - branch-0.5 -
-
- - branch-0.6 -
-
- - branch-0.7 -
-
- - branch-0.8 -
- -
- - dev -
-
- - formatting -
-
- - hive -
-
- - java-api -
-
- - master -
-
- - mesos-0.9 -
-
- - mos-bt -
-
- - mos-bt-dev -
-
- - mos-bt-topo -
-
- - mos-shuffle -
- - - - - -
- - old-mesos -
-
- - old-rdds -
-
- - perf -
- -
- - rxin -
-
- - scala-2.8 -
-
- - scala-2.9 -
-
- - scala-2.10 -
- -
- - shuffle-fix -
-
- - sparkplug -
-
- - streaming -
-
- - td-checksum -
-
- - td-rdd-save -
- - -
- - yarn -
-
- -
Nothing to show
-
- -
-
- - - - - -
- - v0.7.2 -
-
- - v0.7.1 -
- -
- - v0.7.0 -
-
- - v0.6.2 -
-
- - v0.6.1 -
-
- - v0.6.0-yarn -
-
- - v0.6.0 -
-
- - v0.5.2 -
-
- - v0.5.1 -
-
- - v0.5.0 -
- -
- - alpha-0.2 -
-
- - alpha-0.1 -
- - -
- -
Nothing to show
-
- -
-
-
- - -
- - - -
- - - -
- Cleanup -
- -
-

15 contributors

- - - - - - - - - - - - - - - - - -
- -
- -
-
-
-
- - file - 232 lines (228 sloc) - 7.715 kb -
- - -
-
- - - - - -
- 1 -2 -3 -4 -5 -6 -7 -8 -9 -10 -11 -12 -13 -14 -15 -16 -17 -18 -19 -20 -21 -22 -23 -24 -25 -26 -27 -28 -29 -30 -31 -32 -33 -34 -35 -36 -37 -38 -39 -40 -41 -42 -43 -44 -45 -46 -47 -48 -49 -50 -51 -52 -53 -54 -55 -56 -57 -58 -59 -60 -61 -62 -63 -64 -65 -66 -67 -68 -69 -70 -71 -72 -73 -74 -75 -76 -77 -78 -79 -80 -81 -82 -83 -84 -85 -86 -87 -88 -89 -90 -91 -92 -93 -94 -95 -96 -97 -98 -99 -100 -101 -102 -103 -104 -105 -106 -107 -108 -109 -110 -111 -112 -113 -114 -115 -116 -117 -118 -119 -120 -121 -122 -123 -124 -125 -126 -127 -128 -129 -130 -131 -132 -133 -134 -135 -136 -137 -138 -139 -140 -141 -142 -143 -144 -145 -146 -147 -148 -149 -150 -151 -152 -153 -154 -155 -156 -157 -158 -159 -160 -161 -162 -163 -164 -165 -166 -167 -168 -169 -170 -171 -172 -173 -174 -175 -176 -177 -178 -179 -180 -181 -182 -183 -184 -185 -186 -187 -188 -189 -190 -191 -192 -193 -194 -195 -196 -197 -198 -199 -200 -201 -202 -203 -204 -205 -206 -207 -208 -209 -210 -211 -212 -213 -214 -215 -216 -217 -218 -219 -220 -221 -222 -223 -224 -225 -226 -227 -228 -229 -230 -231 - - -
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-parent</artifactId>
    <version>0.9.0-incubating-SNAPSHOT</version>
    <relativePath>../pom.xml</relativePath>
  </parent>

  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.10</artifactId>
  <packaging>jar</packaging>
  <name>Spark Project Core</name>
  <url>http://spark.incubator.apache.org/</url>

  <dependencies>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
    </dependency>
    <dependency>
      <groupId>net.java.dev.jets3t</groupId>
      <artifactId>jets3t</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro-ipc</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId>zookeeper</artifactId>
    </dependency>
    <dependency>
      <groupId>org.eclipse.jetty</groupId>
      <artifactId>jetty-server</artifactId>
    </dependency>
    <dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
    </dependency>
    <dependency>
      <groupId>com.google.code.findbugs</groupId>
      <artifactId>jsr305</artifactId>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
    </dependency>
    <dependency>
      <groupId>com.ning</groupId>
      <artifactId>compress-lzf</artifactId>
    </dependency>
    <dependency>
      <groupId>org.xerial.snappy</groupId>
      <artifactId>snappy-java</artifactId>
    </dependency>
    <dependency>
      <groupId>org.ow2.asm</groupId>
      <artifactId>asm</artifactId>
    </dependency>
    <dependency>
      <groupId>com.twitter</groupId>
      <artifactId>chill_${scala.binary.version}</artifactId>
      <version>0.3.1</version>
    </dependency>
    <dependency>
      <groupId>com.twitter</groupId>
      <artifactId>chill-java</artifactId>
      <version>0.3.1</version>
    </dependency>
    <dependency>
      <groupId>${akka.group}</groupId>
      <artifactId>akka-remote_${scala.binary.version}</artifactId>
    </dependency>
    <dependency>
      <groupId>${akka.group}</groupId>
      <artifactId>akka-slf4j_${scala.binary.version}</artifactId>
    </dependency>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
    </dependency>
    <dependency>
      <groupId>net.liftweb</groupId>
      <artifactId>lift-json_${scala.binary.version}</artifactId>
    </dependency>
    <dependency>
      <groupId>it.unimi.dsi</groupId>
      <artifactId>fastutil</artifactId>
    </dependency>
    <dependency>
      <groupId>colt</groupId>
      <artifactId>colt</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.mesos</groupId>
      <artifactId>mesos</artifactId>
    </dependency>
    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-all</artifactId>
    </dependency>
    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
    </dependency>
    <dependency>
      <groupId>com.codahale.metrics</groupId>
      <artifactId>metrics-core</artifactId>
    </dependency>
    <dependency>
      <groupId>com.codahale.metrics</groupId>
      <artifactId>metrics-jvm</artifactId>
    </dependency>
    <dependency>
      <groupId>com.codahale.metrics</groupId>
      <artifactId>metrics-json</artifactId>
    </dependency>
    <dependency>
      <groupId>com.codahale.metrics</groupId>
      <artifactId>metrics-ganglia</artifactId>
    </dependency>
    <dependency>
      <groupId>com.codahale.metrics</groupId>
      <artifactId>metrics-graphite</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.derby</groupId>
      <artifactId>derby</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>commons-io</groupId>
      <artifactId>commons-io</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.scalatest</groupId>
      <artifactId>scalatest_${scala.binary.version}</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.scalacheck</groupId>
      <artifactId>scalacheck_${scala.binary.version}</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.easymock</groupId>
      <artifactId>easymock</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>com.novocode</groupId>
      <artifactId>junit-interface</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>
  <build>
    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-antrun-plugin</artifactId>
        <executions>
          <execution>
            <phase>test</phase>
            <goals>
              <goal>run</goal>
            </goals>
            <configuration>
              <exportAntProperties>true</exportAntProperties>
              <tasks>
                <property name="spark.classpath" refid="maven.test.classpath" />
                <property environment="env" />
                <fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
                  <condition>
                    <not>
                      <or>
                        <isset property="env.SCALA_HOME" />
                        <isset property="env.SCALA_LIBRARY_PATH" />
                      </or>
                    </not>
                  </condition>
                </fail>
              </tasks>
            </configuration>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.scalatest</groupId>
        <artifactId>scalatest-maven-plugin</artifactId>
        <configuration>
          <environmentVariables>
            <SPARK_HOME>${basedir}/..</SPARK_HOME>
            <SPARK_TESTING>1</SPARK_TESTING>
            <SPARK_CLASSPATH>${spark.classpath}</SPARK_CLASSPATH>
          </environmentVariables>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>
-
-
- -
-
- - - - -
- -
- -
-
- - -
- -
- -
- - -
-
-
- -
-
-
-
-
-
- -
- - - -
- - - Something went wrong with that request. Please try again. -
- - - - + + + + + 4.0.0 + + org.apache.spark + spark-parent + 0.9.0-incubating-SNAPSHOT + ../pom.xml + + + org.apache.spark + spark-core_2.10 + jar + Spark Project Core + http://spark.incubator.apache.org/ + + + + org.apache.hadoop + hadoop-client + + + net.java.dev.jets3t + jets3t + + + org.apache.avro + avro + + + org.apache.avro + avro-ipc + + + org.apache.zookeeper + zookeeper + + + org.eclipse.jetty + jetty-server + + + com.google.guava + guava + + + com.google.code.findbugs + jsr305 + + + org.slf4j + slf4j-api + + + com.ning + compress-lzf + + + org.xerial.snappy + snappy-java + + + org.ow2.asm + asm + + + com.twitter + chill_${scala.binary.version} + 0.3.1 + + + com.twitter + chill-java + 0.3.1 + + + ${akka.group} + akka-remote_${scala.binary.version} + + + ${akka.group} + akka-slf4j_${scala.binary.version} + + + org.scala-lang + scala-library + + + net.liftweb + lift-json_${scala.binary.version} + + + it.unimi.dsi + fastutil + + + colt + colt + + + org.apache.mesos + mesos + + + io.netty + netty-all + + + log4j + log4j + + + com.clearspring.analytics + stream + + + com.codahale.metrics + metrics-core + + + com.codahale.metrics + metrics-jvm + + + com.codahale.metrics + metrics-json + + + com.codahale.metrics + metrics-ganglia + + + com.codahale.metrics + metrics-graphite + + + org.apache.derby + derby + test + + + commons-io + commons-io + test + + + org.scalatest + scalatest_${scala.binary.version} + test + + + org.scalacheck + scalacheck_${scala.binary.version} + test + + + org.easymock + easymock + test + + + com.novocode + junit-interface + test + + + org.slf4j + slf4j-log4j12 + test + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + org.apache.maven.plugins + maven-antrun-plugin + + + test + + run + + + true + + + + + + + + + + + + + + + + + + + + org.scalatest + scalatest-maven-plugin + + + ${basedir}/.. + 1 + ${spark.classpath} + + + + + + diff --git a/pom.xml b/pom.xml index 6545c82b31..3a8eb882cc 100644 --- a/pom.xml +++ b/pom.xml @@ -200,16 +200,16 @@ asm 4.0
- com.clearspring.analytics stream 2.4.0 + com.google.protobuf protobuf-java -- cgit v1.2.3