aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan McClary <dan.mcclary@gmail.com>2014-03-18 00:45:47 -0700
committerMatei Zaharia <matei@databricks.com>2014-03-18 00:45:47 -0700
commite3681f26fae7e87321ac991f5a0fb7517415803a (patch)
treeecb920edc1484d1a3d7bda193546f1c71984e0a4
parent087eedca32fd87bfe1629588091bd307d45e4a7c (diff)
downloadspark-e3681f26fae7e87321ac991f5a0fb7517415803a.tar.gz
spark-e3681f26fae7e87321ac991f5a0fb7517415803a.tar.bz2
spark-e3681f26fae7e87321ac991f5a0fb7517415803a.zip
Spark 1246 add min max to stat counter
Here's the addition of min and max to statscounter.py and min and max methods to rdd.py. Author: Dan McClary <dan.mcclary@gmail.com> Closes #144 from dwmclary/SPARK-1246-add-min-max-to-stat-counter and squashes the following commits: fd3fd4b [Dan McClary] fixed error, updated test 82cde0e [Dan McClary] flipped incorrectly assigned inf values in StatCounter 5d96799 [Dan McClary] added max and min to StatCounter repr for pyspark 21dd366 [Dan McClary] added max and min to StatCounter output, updated doc 1a97558 [Dan McClary] added max and min to StatCounter output, updated doc a5c13b0 [Dan McClary] Added min and max to Scala and Java RDD, added min and max to StatCounter ed67136 [Dan McClary] broke min/max out into separate transaction, added to rdd.py 1e7056d [Dan McClary] added underscore to getBucket 37a7dea [Dan McClary] cleaned up boundaries for histogram -- uses real min/max when buckets are derived 29981f2 [Dan McClary] fixed indentation on doctest comment eaf89d9 [Dan McClary] added correct doctest for histogram 4916016 [Dan McClary] added histogram method, added max and min to statscounter
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/util/StatCounter.scala18
-rw-r--r--core/src/test/scala/org/apache/spark/PartitioningSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala2
-rw-r--r--python/pyspark/rdd.py19
-rw-r--r--python/pyspark/statcounter.py25
7 files changed, 93 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 2ba4fb8c18..05b89b9857 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -479,6 +479,26 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
}
/**
+ * Returns the maximum element from this RDD as defined by the specified
+ * Comparator[T].
+ * @params comp the comparator that defines ordering
+ * @return the maximum of the RDD
+ * */
+ def max(comp: Comparator[T]): T = {
+ rdd.max()(Ordering.comparatorToOrdering(comp))
+ }
+
+ /**
+ * Returns the minimum element from this RDD as defined by the specified
+ * Comparator[T].
+ * @params comp the comparator that defines ordering
+ * @return the minimum of the RDD
+ * */
+ def min(comp: Comparator[T]): T = {
+ rdd.min()(Ordering.comparatorToOrdering(comp))
+ }
+
+ /**
* Returns the first K elements from this RDD using the
* natural ordering for T while maintain the order.
* @param num the number of top elements to return
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index f8283fbbb9..ddb901246d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -957,6 +957,18 @@ abstract class RDD[T: ClassTag](
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse)
/**
+ * Returns the max of this RDD as defined by the implicit Ordering[T].
+ * @return the maximum element of the RDD
+ * */
+ def max()(implicit ord: Ordering[T]):T = this.reduce(ord.max)
+
+ /**
+ * Returns the min of this RDD as defined by the implicit Ordering[T].
+ * @return the minimum element of the RDD
+ * */
+ def min()(implicit ord: Ordering[T]):T = this.reduce(ord.min)
+
+ /**
* Save this RDD as a text file, using string representations of elements.
*/
def saveAsTextFile(path: String) {
diff --git a/core/src/main/scala/org/apache/spark/util/StatCounter.scala b/core/src/main/scala/org/apache/spark/util/StatCounter.scala
index f837dc7ccc..732748a7ff 100644
--- a/core/src/main/scala/org/apache/spark/util/StatCounter.scala
+++ b/core/src/main/scala/org/apache/spark/util/StatCounter.scala
@@ -29,6 +29,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
private var n: Long = 0 // Running count of our values
private var mu: Double = 0 // Running mean of our values
private var m2: Double = 0 // Running variance numerator (sum of (x - mean)^2)
+ private var maxValue: Double = Double.NegativeInfinity // Running max of our values
+ private var minValue: Double = Double.PositiveInfinity // Running min of our values
merge(values)
@@ -41,6 +43,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
n += 1
mu += delta / n
m2 += delta * (value - mu)
+ maxValue = math.max(maxValue, value)
+ minValue = math.min(minValue, value)
this
}
@@ -58,7 +62,9 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
if (n == 0) {
mu = other.mu
m2 = other.m2
- n = other.n
+ n = other.n
+ maxValue = other.maxValue
+ minValue = other.minValue
} else if (other.n != 0) {
val delta = other.mu - mu
if (other.n * 10 < n) {
@@ -70,6 +76,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
}
m2 += other.m2 + (delta * delta * n * other.n) / (n + other.n)
n += other.n
+ maxValue = math.max(maxValue, other.maxValue)
+ minValue = math.min(minValue, other.minValue)
}
this
}
@@ -81,6 +89,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
other.n = n
other.mu = mu
other.m2 = m2
+ other.maxValue = maxValue
+ other.minValue = minValue
other
}
@@ -90,6 +100,10 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
def sum: Double = n * mu
+ def max: Double = maxValue
+
+ def min: Double = minValue
+
/** Return the variance of the values. */
def variance: Double = {
if (n == 0) {
@@ -121,7 +135,7 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
def sampleStdev: Double = math.sqrt(sampleVariance)
override def toString: String = {
- "(count: %d, mean: %f, stdev: %f)".format(count, mean, stdev)
+ "(count: %d, mean: %f, stdev: %f, max: %f, min: %f)".format(count, mean, stdev, max, min)
}
}
diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
index 4305686d3a..996db70809 100644
--- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
@@ -171,6 +171,8 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
assert(abs(6.0/2 - rdd.mean) < 0.01)
assert(abs(1.0 - rdd.variance) < 0.01)
assert(abs(1.0 - rdd.stdev) < 0.01)
+ assert(stats.max === 4.0)
+ assert(stats.min === 2.0)
// Add other tests here for classes that should be able to handle empty partitions correctly
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 9512e0e6ee..d6b5fdc798 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -47,6 +47,8 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(nums.glom().map(_.toList).collect().toList === List(List(1, 2), List(3, 4)))
assert(nums.collect({ case i if i >= 3 => i.toString }).collect().toList === List("3", "4"))
assert(nums.keyBy(_.toString).collect().toList === List(("1", 1), ("2", 2), ("3", 3), ("4", 4)))
+ assert(nums.max() === 4)
+ assert(nums.min() === 1)
val partitionSums = nums.mapPartitions(iter => Iterator(iter.reduceLeft(_ + _)))
assert(partitionSums.collect().toList === List(3, 7))
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index f3b432ff24..ae09dbff02 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -571,7 +571,26 @@ class RDD(object):
return reduce(op, vals, zeroValue)
# TODO: aggregate
+
+
+ def max(self):
+ """
+ Find the maximum item in this RDD.
+
+ >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).max()
+ 43.0
+ """
+ return self.reduce(max)
+ def min(self):
+ """
+ Find the maximum item in this RDD.
+
+ >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).min()
+ 1.0
+ """
+ return self.reduce(min)
+
def sum(self):
"""
Add up the elements in this RDD.
diff --git a/python/pyspark/statcounter.py b/python/pyspark/statcounter.py
index 8e1cbd4ad9..080325061a 100644
--- a/python/pyspark/statcounter.py
+++ b/python/pyspark/statcounter.py
@@ -26,7 +26,9 @@ class StatCounter(object):
self.n = 0L # Running count of our values
self.mu = 0.0 # Running mean of our values
self.m2 = 0.0 # Running variance numerator (sum of (x - mean)^2)
-
+ self.maxValue = float("-inf")
+ self.minValue = float("inf")
+
for v in values:
self.merge(v)
@@ -36,6 +38,11 @@ class StatCounter(object):
self.n += 1
self.mu += delta / self.n
self.m2 += delta * (value - self.mu)
+ if self.maxValue < value:
+ self.maxValue = value
+ if self.minValue > value:
+ self.minValue = value
+
return self
# Merge another StatCounter into this one, adding up the internal statistics.
@@ -49,7 +56,10 @@ class StatCounter(object):
if self.n == 0:
self.mu = other.mu
self.m2 = other.m2
- self.n = other.n
+ self.n = other.n
+ self.maxValue = other.maxValue
+ self.minValue = other.minValue
+
elif other.n != 0:
delta = other.mu - self.mu
if other.n * 10 < self.n:
@@ -58,6 +68,9 @@ class StatCounter(object):
self.mu = other.mu - (delta * self.n) / (self.n + other.n)
else:
self.mu = (self.mu * self.n + other.mu * other.n) / (self.n + other.n)
+
+ self.maxValue = max(self.maxValue, other.maxValue)
+ self.minValue = min(self.minValue, other.minValue)
self.m2 += other.m2 + (delta * delta * self.n * other.n) / (self.n + other.n)
self.n += other.n
@@ -76,6 +89,12 @@ class StatCounter(object):
def sum(self):
return self.n * self.mu
+ def min(self):
+ return self.minValue
+
+ def max(self):
+ return self.maxValue
+
# Return the variance of the values.
def variance(self):
if self.n == 0:
@@ -105,5 +124,5 @@ class StatCounter(object):
return math.sqrt(self.sampleVariance())
def __repr__(self):
- return "(count: %s, mean: %s, stdev: %s)" % (self.count(), self.mean(), self.stdev())
+ return "(count: %s, mean: %s, stdev: %s, max: %s, min: %s)" % (self.count(), self.mean(), self.stdev(), self.max(), self.min())