diff options
author | Holden Karau <holden@pigscanfly.ca> | 2013-10-18 00:07:49 -0700 |
---|---|---|
committer | Holden Karau <holden@pigscanfly.ca> | 2013-10-19 00:57:25 -0700 |
commit | 2a37235825cecd3f75286d11456c6e3cb13d4327 (patch) | |
tree | 9b50c0a07695b9cfb5a371e785a43b7b7ed1988b /core/src/main/scala/org | |
parent | dca80094d317363e1e0d7e32bc7dfd99faf943cf (diff) | |
download | spark-2a37235825cecd3f75286d11456c6e3cb13d4327.tar.gz spark-2a37235825cecd3f75286d11456c6e3cb13d4327.tar.bz2 spark-2a37235825cecd3f75286d11456c6e3cb13d4327.zip |
Initial commit of adding histogram functionality to the DoubleRDDFunctions.
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r-- | core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala | 32 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala | 134 |
2 files changed, 166 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala index 5fd1fab580..d2a2818e59 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala @@ -26,6 +26,8 @@ import org.apache.spark.storage.StorageLevel import java.lang.Double import org.apache.spark.Partitioner +import scala.collection.JavaConverters._ + class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, JavaDoubleRDD] { override val classManifest: ClassManifest[Double] = implicitly[ClassManifest[Double]] @@ -158,6 +160,36 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav /** (Experimental) Approximate operation to return the sum within a timeout. */ def sumApprox(timeout: Long): PartialResult[BoundedDouble] = srdd.sumApprox(timeout) + + /** + * Compute a histogram of the data using bucketCount number of buckets evenly + * spaced between the minimum and maximum of the RDD. For example if the min + * value is 0 and the max is 100 and there are two buckets the resulting + * buckets will be [0,50) [50,100]. bucketCount must be at least 1 + * If the RDD contains infinity, NaN throws an exception + * If the elements in RDD do not vary (max == min) throws an exception + */ + def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] = { + val result = srdd.histogram(bucketCount) + (result._1.map(scala.Double.box(_)), result._2) + } + /** + * Compute a histogram using the provided buckets. The buckets are all open + * to the left except for the last which is closed + * e.g. for the array + * [1,10,20,50] the buckets are [1,10) [10,20) [20,50] + * e.g 1<=x<10 , 10<=x<20, 20<=x<50 + * And on the input of 1 and 50 we would have a histogram of 1,0,0 + * + * Note: if your histogram is evenly spaced (e.g. [0,10,20,30]) this switches + * from an O(log n) inseration to O(1) per element. (where n = # buckets) + * buckets must be sorted and not contain any duplicates. + * buckets array must be at least two elements + * All NaN entries are treated the same. + */ + def histogram(buckets: Array[Double]): Array[Long] = { + srdd.histogram(buckets.map(_.toDouble)) + } } object JavaDoubleRDD { diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala index a4bec41752..776a83cefe 100644 --- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala @@ -24,6 +24,8 @@ import org.apache.spark.partial.SumEvaluator import org.apache.spark.util.StatCounter import org.apache.spark.{TaskContext, Logging} +import scala.collection.immutable.NumericRange + /** * Extra functions available on RDDs of Doubles through an implicit conversion. * Import `org.apache.spark.SparkContext._` at the top of your program to use these functions. @@ -76,4 +78,136 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { val evaluator = new SumEvaluator(self.partitions.size, confidence) self.context.runApproximateJob(self, processPartition, evaluator, timeout) } + + /** + * Compute a histogram of the data using bucketCount number of buckets evenly + * spaced between the minimum and maximum of the RDD. For example if the min + * value is 0 and the max is 100 and there are two buckets the resulting + * buckets will be [0,50) [50,100]. bucketCount must be at least 1 + * If the RDD contains infinity, NaN throws an exception + * If the elements in RDD do not vary (max == min) throws an exception + */ + def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] = { + // Compute the minimum and the maxium + val (max: Double, min: Double) = self.mapPartitions { items => + Iterator(items.foldRight(-1/0.0, Double.NaN)((e: Double, x: Pair[Double, Double]) => + (x._1.max(e),x._2.min(e)))) + }.reduce { (maxmin1, maxmin2) => + (maxmin1._1.max(maxmin2._1), maxmin1._2.min(maxmin2._2)) + } + if (max.isNaN() || max.isInfinity || min.isInfinity ) { + throw new UnsupportedOperationException("Histogram on either an empty RDD or RDD containing +-infinity or NaN") + } + if (max == min) { + throw new UnsupportedOperationException("Histogram with no range in elements") + } + val increment: Double = (max-min)/bucketCount.toDouble + val range = Range.Double.inclusive(min, max, increment) + val buckets: Array[Double] = range.toArray + (buckets,histogram(buckets)) + } + /** + * Compute a histogram using the provided buckets. The buckets are all open + * to the left except for the last which is closed + * e.g. for the array + * [1,10,20,50] the buckets are [1,10) [10,20) [20,50] + * e.g 1<=x<10 , 10<=x<20, 20<=x<50 + * And on the input of 1 and 50 we would have a histogram of 1,0,0 + * + * Note: if your histogram is evenly spaced (e.g. [0,10,20,30]) this switches + * from an O(log n) inseration to O(1) per element. (where n = # buckets) + * buckets must be sorted and not contain any duplicates. + * buckets array must be at least two elements + * All NaN entries are treated the same. + */ + def histogram(buckets: Array[Double]): Array[Long] = { + if (buckets.length < 2) { + throw new IllegalArgumentException("buckets array must have at least two elements") + } + // The histogramPartition function computes the partail histogram for a given + // partition. The provided bucketFunction determines which bucket in the array + // to increment or returns None if there is no bucket. This is done so we can + // specialize for uniformly distributed buckets and save the O(log n) binary + // search cost. + def histogramPartition(bucketFunction: (Double) => Option[Int])(iter: Iterator[Double]): Iterator[Array[Long]] = { + val counters = new Array[Long](buckets.length-1) + while (iter.hasNext) { + bucketFunction(iter.next()) match { + case Some(x: Int) => {counters(x)+=1} + case _ => {} + } + } + Iterator(counters) + } + // Merge the counters. + def mergeCounters(a1: Array[Long], a2: Array[Long]): Array[Long] = { + a1.indices.foreach(i => a1(i) += a2(i)) + a1 + } + // Basic bucket function. This works using Java's built in Array + // binary search. Takes log(size(buckets)) + def basicBucketFunction(e: Double): Option[Int] = { + val location = java.util.Arrays.binarySearch(buckets, e) + if (location < 0) { + // If the location is less than 0 then the insertion point in the array + // to keep it sorted is -location-1 + val insertionPoint = -location-1 + // If we have to insert before the first element or after the last one + // its out of bounds. + // We do this rather than buckets.lengthCompare(insertionPoint) + // because Array[Double] fails to override it (for now). + if (insertionPoint > 0 && insertionPoint < buckets.length) { + Some(insertionPoint-1) + } else { + None + } + } else if (location < buckets.length-1) { + // Exact match, just insert here + Some(location) + } else { + // Exact match to the last element + Some(location-1) + } + } + // Determine the bucket function in constant time. Requires that buckets are evenly spaced + def fastBucketFunction(min: Double, increment: Double, count: Int)(e: Double): Option[Int] = { + // If our input is not a number unless the increment is also NaN then we fail fast + if (e.isNaN()) { + return None + } + val bucketNumber = (e-min)/(increment) + // We do this rather than buckets.lengthCompare(bucketNumber) + // because Array[Double] fails to override it (for now). + if (bucketNumber > count || bucketNumber < 0) { + None + } else { + Some(bucketNumber.toInt.min(count-1)) + } + } + def evenlySpaced(buckets: Array[Double]): Boolean = { + val delta = buckets(1)-buckets(0) + // Technically you could have an evenly spaced bucket with NaN + // increments but then its a single bucket and this makes the + // fastBucketFunction simpler. + if (delta.isNaN() || delta.isInfinite()) { + return false + } + for (i <- 1 to buckets.length-1) { + if (buckets(i)-buckets(i-1) != delta) { + return false + } + } + true + } + // Decide which bucket function to pass to histogramPartition. We decide here + // rather than having a general function so that the decission need only be made + // once rather than once per shard + val bucketFunction = if (evenlySpaced(buckets)) { + fastBucketFunction(buckets(0), buckets(1)-buckets(0), buckets.length-1) _ + } else { + basicBucketFunction _ + } + self.mapPartitions(histogramPartition(bucketFunction)).reduce(mergeCounters) + } + } |