aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2015-02-09 10:12:12 +0000
committerSean Owen <sowen@cloudera.com>2015-02-09 10:12:12 +0000
commit0793ee1b4dea1f4b0df749e8ad7c1ab70b512faf (patch)
tree0f68f9ad5003f2c173bf0a37392b5ce2636802e9 /mllib
parent4dfe180fc893bee1146161f8b2a6efd4d6d2bb8c (diff)
downloadspark-0793ee1b4dea1f4b0df749e8ad7c1ab70b512faf.tar.gz
spark-0793ee1b4dea1f4b0df749e8ad7c1ab70b512faf.tar.bz2
spark-0793ee1b4dea1f4b0df749e8ad7c1ab70b512faf.zip
SPARK-2149. [MLLIB] Univariate kernel density estimation
Author: Sandy Ryza <sandy@cloudera.com> Closes #1093 from sryza/sandy-spark-2149 and squashes the following commits: 5f06b33 [Sandy Ryza] More review comments 0f73060 [Sandy Ryza] Respond to Sean's review comments 0dfa005 [Sandy Ryza] SPARK-2149. Univariate kernel density estimation
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/stat/KernelDensity.scala71
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala14
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/stat/KernelDensitySuite.scala47
3 files changed, 132 insertions, 0 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/KernelDensity.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/KernelDensity.scala
new file mode 100644
index 0000000000..0deef11b45
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/KernelDensity.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat
+
+import org.apache.spark.rdd.RDD
+
+private[stat] object KernelDensity {
+ /**
+ * Given a set of samples from a distribution, estimates its density at the set of given points.
+ * Uses a Gaussian kernel with the given standard deviation.
+ */
+ def estimate(samples: RDD[Double], standardDeviation: Double,
+ evaluationPoints: Array[Double]): Array[Double] = {
+ if (standardDeviation <= 0.0) {
+ throw new IllegalArgumentException("Standard deviation must be positive")
+ }
+
+ // This gets used in each Gaussian PDF computation, so compute it up front
+ val logStandardDeviationPlusHalfLog2Pi =
+ Math.log(standardDeviation) + 0.5 * Math.log(2 * Math.PI)
+
+ val (points, count) = samples.aggregate((new Array[Double](evaluationPoints.length), 0))(
+ (x, y) => {
+ var i = 0
+ while (i < evaluationPoints.length) {
+ x._1(i) += normPdf(y, standardDeviation, logStandardDeviationPlusHalfLog2Pi,
+ evaluationPoints(i))
+ i += 1
+ }
+ (x._1, i)
+ },
+ (x, y) => {
+ var i = 0
+ while (i < evaluationPoints.length) {
+ x._1(i) += y._1(i)
+ i += 1
+ }
+ (x._1, x._2 + y._2)
+ })
+
+ var i = 0
+ while (i < points.length) {
+ points(i) /= count
+ i += 1
+ }
+ points
+ }
+
+ private def normPdf(mean: Double, standardDeviation: Double,
+ logStandardDeviationPlusHalfLog2Pi: Double, x: Double): Double = {
+ val x0 = x - mean
+ val x1 = x0 / standardDeviation
+ val logDensity = -0.5 * x1 * x1 - logStandardDeviationPlusHalfLog2Pi
+ Math.exp(logDensity)
+ }
+}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala
index b3fad0c52d..32561620ac 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala
@@ -149,4 +149,18 @@ object Statistics {
def chiSqTest(data: RDD[LabeledPoint]): Array[ChiSqTestResult] = {
ChiSqTest.chiSquaredFeatures(data)
}
+
+ /**
+ * Given an empirical distribution defined by the input RDD of samples, estimate its density at
+ * each of the given evaluation points using a Gaussian kernel.
+ *
+ * @param samples The samples RDD used to define the empirical distribution.
+ * @param standardDeviation The standard deviation of the kernel Gaussians.
+ * @param evaluationPoints The points at which to estimate densities.
+ * @return An array the same size as evaluationPoints with the density at each point.
+ */
+ def kernelDensity(samples: RDD[Double], standardDeviation: Double,
+ evaluationPoints: Iterable[Double]): Array[Double] = {
+ KernelDensity.estimate(samples, standardDeviation, evaluationPoints.toArray)
+ }
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/stat/KernelDensitySuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/stat/KernelDensitySuite.scala
new file mode 100644
index 0000000000..f6a1e19f50
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/KernelDensitySuite.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat
+
+import org.scalatest.FunSuite
+
+import org.apache.commons.math3.distribution.NormalDistribution
+
+import org.apache.spark.mllib.util.LocalClusterSparkContext
+
+class KernelDensitySuite extends FunSuite with LocalClusterSparkContext {
+ test("kernel density single sample") {
+ val rdd = sc.parallelize(Array(5.0))
+ val evaluationPoints = Array(5.0, 6.0)
+ val densities = KernelDensity.estimate(rdd, 3.0, evaluationPoints)
+ val normal = new NormalDistribution(5.0, 3.0)
+ val acceptableErr = 1e-6
+ assert(densities(0) - normal.density(5.0) < acceptableErr)
+ assert(densities(0) - normal.density(6.0) < acceptableErr)
+ }
+
+ test("kernel density multiple samples") {
+ val rdd = sc.parallelize(Array(5.0, 10.0))
+ val evaluationPoints = Array(5.0, 6.0)
+ val densities = KernelDensity.estimate(rdd, 3.0, evaluationPoints)
+ val normal1 = new NormalDistribution(5.0, 3.0)
+ val normal2 = new NormalDistribution(10.0, 3.0)
+ val acceptableErr = 1e-6
+ assert(densities(0) - (normal1.density(5.0) + normal2.density(5.0)) / 2 < acceptableErr)
+ assert(densities(0) - (normal1.density(6.0) + normal2.density(6.0)) / 2 < acceptableErr)
+ }
+}