diff options
author | Dan McClary <dan.mcclary@gmail.com> | 2014-03-18 00:45:47 -0700 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2014-03-18 00:45:47 -0700 |
commit | e3681f26fae7e87321ac991f5a0fb7517415803a (patch) | |
tree | ecb920edc1484d1a3d7bda193546f1c71984e0a4 /core | |
parent | 087eedca32fd87bfe1629588091bd307d45e4a7c (diff) | |
download | spark-e3681f26fae7e87321ac991f5a0fb7517415803a.tar.gz spark-e3681f26fae7e87321ac991f5a0fb7517415803a.tar.bz2 spark-e3681f26fae7e87321ac991f5a0fb7517415803a.zip |
Spark 1246 add min max to stat counter
Here's the addition of min and max to statscounter.py and min and max methods to rdd.py.
Author: Dan McClary <dan.mcclary@gmail.com>
Closes #144 from dwmclary/SPARK-1246-add-min-max-to-stat-counter and squashes the following commits:
fd3fd4b [Dan McClary] fixed error, updated test
82cde0e [Dan McClary] flipped incorrectly assigned inf values in StatCounter
5d96799 [Dan McClary] added max and min to StatCounter repr for pyspark
21dd366 [Dan McClary] added max and min to StatCounter output, updated doc
1a97558 [Dan McClary] added max and min to StatCounter output, updated doc
a5c13b0 [Dan McClary] Added min and max to Scala and Java RDD, added min and max to StatCounter
ed67136 [Dan McClary] broke min/max out into separate transaction, added to rdd.py
1e7056d [Dan McClary] added underscore to getBucket
37a7dea [Dan McClary] cleaned up boundaries for histogram -- uses real min/max when buckets are derived
29981f2 [Dan McClary] fixed indentation on doctest comment
eaf89d9 [Dan McClary] added correct doctest for histogram
4916016 [Dan McClary] added histogram method, added max and min to statscounter
Diffstat (limited to 'core')
5 files changed, 52 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 2ba4fb8c18..05b89b9857 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -479,6 +479,26 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { } /** + * Returns the maximum element from this RDD as defined by the specified + * Comparator[T]. + * @params comp the comparator that defines ordering + * @return the maximum of the RDD + * */ + def max(comp: Comparator[T]): T = { + rdd.max()(Ordering.comparatorToOrdering(comp)) + } + + /** + * Returns the minimum element from this RDD as defined by the specified + * Comparator[T]. + * @params comp the comparator that defines ordering + * @return the minimum of the RDD + * */ + def min(comp: Comparator[T]): T = { + rdd.min()(Ordering.comparatorToOrdering(comp)) + } + + /** * Returns the first K elements from this RDD using the * natural ordering for T while maintain the order. * @param num the number of top elements to return diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index f8283fbbb9..ddb901246d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -957,6 +957,18 @@ abstract class RDD[T: ClassTag]( def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse) /** + * Returns the max of this RDD as defined by the implicit Ordering[T]. + * @return the maximum element of the RDD + * */ + def max()(implicit ord: Ordering[T]):T = this.reduce(ord.max) + + /** + * Returns the min of this RDD as defined by the implicit Ordering[T]. + * @return the minimum element of the RDD + * */ + def min()(implicit ord: Ordering[T]):T = this.reduce(ord.min) + + /** * Save this RDD as a text file, using string representations of elements. */ def saveAsTextFile(path: String) { diff --git a/core/src/main/scala/org/apache/spark/util/StatCounter.scala b/core/src/main/scala/org/apache/spark/util/StatCounter.scala index f837dc7ccc..732748a7ff 100644 --- a/core/src/main/scala/org/apache/spark/util/StatCounter.scala +++ b/core/src/main/scala/org/apache/spark/util/StatCounter.scala @@ -29,6 +29,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { private var n: Long = 0 // Running count of our values private var mu: Double = 0 // Running mean of our values private var m2: Double = 0 // Running variance numerator (sum of (x - mean)^2) + private var maxValue: Double = Double.NegativeInfinity // Running max of our values + private var minValue: Double = Double.PositiveInfinity // Running min of our values merge(values) @@ -41,6 +43,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { n += 1 mu += delta / n m2 += delta * (value - mu) + maxValue = math.max(maxValue, value) + minValue = math.min(minValue, value) this } @@ -58,7 +62,9 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { if (n == 0) { mu = other.mu m2 = other.m2 - n = other.n + n = other.n + maxValue = other.maxValue + minValue = other.minValue } else if (other.n != 0) { val delta = other.mu - mu if (other.n * 10 < n) { @@ -70,6 +76,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { } m2 += other.m2 + (delta * delta * n * other.n) / (n + other.n) n += other.n + maxValue = math.max(maxValue, other.maxValue) + minValue = math.min(minValue, other.minValue) } this } @@ -81,6 +89,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { other.n = n other.mu = mu other.m2 = m2 + other.maxValue = maxValue + other.minValue = minValue other } @@ -90,6 +100,10 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { def sum: Double = n * mu + def max: Double = maxValue + + def min: Double = minValue + /** Return the variance of the values. */ def variance: Double = { if (n == 0) { @@ -121,7 +135,7 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { def sampleStdev: Double = math.sqrt(sampleVariance) override def toString: String = { - "(count: %d, mean: %f, stdev: %f)".format(count, mean, stdev) + "(count: %d, mean: %f, stdev: %f, max: %f, min: %f)".format(count, mean, stdev, max, min) } } diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala index 4305686d3a..996db70809 100644 --- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala +++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala @@ -171,6 +171,8 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet assert(abs(6.0/2 - rdd.mean) < 0.01) assert(abs(1.0 - rdd.variance) < 0.01) assert(abs(1.0 - rdd.stdev) < 0.01) + assert(stats.max === 4.0) + assert(stats.min === 2.0) // Add other tests here for classes that should be able to handle empty partitions correctly } diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 9512e0e6ee..d6b5fdc798 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -47,6 +47,8 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(nums.glom().map(_.toList).collect().toList === List(List(1, 2), List(3, 4))) assert(nums.collect({ case i if i >= 3 => i.toString }).collect().toList === List("3", "4")) assert(nums.keyBy(_.toString).collect().toList === List(("1", 1), ("2", 2), ("3", 3), ("4", 4))) + assert(nums.max() === 4) + assert(nums.min() === 1) val partitionSums = nums.mapPartitions(iter => Iterator(iter.reduceLeft(_ + _))) assert(partitionSums.collect().toList === List(3, 7)) |