From 0793ee1b4dea1f4b0df749e8ad7c1ab70b512faf Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Mon, 9 Feb 2015 10:12:12 +0000 Subject: SPARK-2149. [MLLIB] Univariate kernel density estimation Author: Sandy Ryza Closes #1093 from sryza/sandy-spark-2149 and squashes the following commits: 5f06b33 [Sandy Ryza] More review comments 0f73060 [Sandy Ryza] Respond to Sean's review comments 0dfa005 [Sandy Ryza] SPARK-2149. Univariate kernel density estimation --- .../apache/spark/mllib/stat/KernelDensity.scala | 71 ++++++++++++++++++++++ .../org/apache/spark/mllib/stat/Statistics.scala | 14 +++++ .../spark/mllib/stat/KernelDensitySuite.scala | 47 ++++++++++++++ 3 files changed, 132 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/stat/KernelDensity.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/stat/KernelDensitySuite.scala (limited to 'mllib') diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/KernelDensity.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/KernelDensity.scala new file mode 100644 index 0000000000..0deef11b45 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/KernelDensity.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat + +import org.apache.spark.rdd.RDD + +private[stat] object KernelDensity { + /** + * Given a set of samples from a distribution, estimates its density at the set of given points. + * Uses a Gaussian kernel with the given standard deviation. + */ + def estimate(samples: RDD[Double], standardDeviation: Double, + evaluationPoints: Array[Double]): Array[Double] = { + if (standardDeviation <= 0.0) { + throw new IllegalArgumentException("Standard deviation must be positive") + } + + // This gets used in each Gaussian PDF computation, so compute it up front + val logStandardDeviationPlusHalfLog2Pi = + Math.log(standardDeviation) + 0.5 * Math.log(2 * Math.PI) + + val (points, count) = samples.aggregate((new Array[Double](evaluationPoints.length), 0))( + (x, y) => { + var i = 0 + while (i < evaluationPoints.length) { + x._1(i) += normPdf(y, standardDeviation, logStandardDeviationPlusHalfLog2Pi, + evaluationPoints(i)) + i += 1 + } + (x._1, i) + }, + (x, y) => { + var i = 0 + while (i < evaluationPoints.length) { + x._1(i) += y._1(i) + i += 1 + } + (x._1, x._2 + y._2) + }) + + var i = 0 + while (i < points.length) { + points(i) /= count + i += 1 + } + points + } + + private def normPdf(mean: Double, standardDeviation: Double, + logStandardDeviationPlusHalfLog2Pi: Double, x: Double): Double = { + val x0 = x - mean + val x1 = x0 / standardDeviation + val logDensity = -0.5 * x1 * x1 - logStandardDeviationPlusHalfLog2Pi + Math.exp(logDensity) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala index b3fad0c52d..32561620ac 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala @@ -149,4 +149,18 @@ object Statistics { def chiSqTest(data: RDD[LabeledPoint]): Array[ChiSqTestResult] = { ChiSqTest.chiSquaredFeatures(data) } + + /** + * Given an empirical distribution defined by the input RDD of samples, estimate its density at + * each of the given evaluation points using a Gaussian kernel. + * + * @param samples The samples RDD used to define the empirical distribution. + * @param standardDeviation The standard deviation of the kernel Gaussians. + * @param evaluationPoints The points at which to estimate densities. + * @return An array the same size as evaluationPoints with the density at each point. + */ + def kernelDensity(samples: RDD[Double], standardDeviation: Double, + evaluationPoints: Iterable[Double]): Array[Double] = { + KernelDensity.estimate(samples, standardDeviation, evaluationPoints.toArray) + } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/stat/KernelDensitySuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/stat/KernelDensitySuite.scala new file mode 100644 index 0000000000..f6a1e19f50 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/KernelDensitySuite.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat + +import org.scalatest.FunSuite + +import org.apache.commons.math3.distribution.NormalDistribution + +import org.apache.spark.mllib.util.LocalClusterSparkContext + +class KernelDensitySuite extends FunSuite with LocalClusterSparkContext { + test("kernel density single sample") { + val rdd = sc.parallelize(Array(5.0)) + val evaluationPoints = Array(5.0, 6.0) + val densities = KernelDensity.estimate(rdd, 3.0, evaluationPoints) + val normal = new NormalDistribution(5.0, 3.0) + val acceptableErr = 1e-6 + assert(densities(0) - normal.density(5.0) < acceptableErr) + assert(densities(0) - normal.density(6.0) < acceptableErr) + } + + test("kernel density multiple samples") { + val rdd = sc.parallelize(Array(5.0, 10.0)) + val evaluationPoints = Array(5.0, 6.0) + val densities = KernelDensity.estimate(rdd, 3.0, evaluationPoints) + val normal1 = new NormalDistribution(5.0, 3.0) + val normal2 = new NormalDistribution(10.0, 3.0) + val acceptableErr = 1e-6 + assert(densities(0) - (normal1.density(5.0) + normal2.density(5.0)) / 2 < acceptableErr) + assert(densities(0) - (normal1.density(6.0) + normal2.density(6.0)) / 2 < acceptableErr) + } +} -- cgit v1.2.3