aboutsummaryrefslogtreecommitdiff
path: root/mllib/src
diff options
context:
space:
mode:
Diffstat (limited to 'mllib/src')
-rw-r--r--mllib/src/main/scala/spark/mllib/clustering/KMeans.scala334
-rw-r--r--mllib/src/main/scala/spark/mllib/clustering/KMeansModel.scala44
-rw-r--r--mllib/src/main/scala/spark/mllib/clustering/LocalKMeans.scala105
-rw-r--r--mllib/src/main/scala/spark/mllib/optimization/Gradient.scala50
-rw-r--r--mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala79
-rw-r--r--mllib/src/main/scala/spark/mllib/optimization/Updater.scala44
-rw-r--r--mllib/src/main/scala/spark/mllib/recommendation/ALS.scala436
-rw-r--r--mllib/src/main/scala/spark/mllib/recommendation/MatrixFactorizationModel.scala40
-rw-r--r--mllib/src/main/scala/spark/mllib/regression/LogisticRegression.scala175
-rw-r--r--mllib/src/main/scala/spark/mllib/regression/LogisticRegressionGenerator.scala58
-rw-r--r--mllib/src/main/scala/spark/mllib/regression/Regression.scala38
-rw-r--r--mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala211
-rw-r--r--mllib/src/main/scala/spark/mllib/regression/RidgeRegressionGenerator.scala72
-rw-r--r--mllib/src/main/scala/spark/mllib/util/MLUtils.scala112
-rw-r--r--mllib/src/test/resources/log4j.properties28
-rw-r--r--mllib/src/test/scala/spark/mllib/clustering/KMeansSuite.scala170
-rw-r--r--mllib/src/test/scala/spark/mllib/recommendation/ALSSuite.scala97
-rw-r--r--mllib/src/test/scala/spark/mllib/regression/LogisticRegressionSuite.scala74
-rw-r--r--mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala64
19 files changed, 2231 insertions, 0 deletions
diff --git a/mllib/src/main/scala/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/spark/mllib/clustering/KMeans.scala
new file mode 100644
index 0000000000..d875d6de50
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/clustering/KMeans.scala
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.clustering
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+import spark.{SparkContext, RDD}
+import spark.SparkContext._
+import spark.Logging
+import spark.mllib.util.MLUtils
+
+import org.jblas.DoubleMatrix
+
+
+/**
+ * K-means clustering with support for multiple parallel runs and a k-means++ like initialization
+ * mode (the k-means|| algorithm by Bahmani et al). When multiple concurrent runs are requested,
+ * they are executed together with joint passes over the data for efficiency.
+ *
+ * This is an iterative algorithm that will make multiple passes over the data, so any RDDs given
+ * to it should be cached by the user.
+ */
+class KMeans private (
+ var k: Int,
+ var maxIterations: Int,
+ var runs: Int,
+ var initializationMode: String,
+ var initializationSteps: Int,
+ var epsilon: Double)
+ extends Serializable with Logging
+{
+ private type ClusterCenters = Array[Array[Double]]
+
+ def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4)
+
+ /** Set the number of clusters to create (k). Default: 2. */
+ def setK(k: Int): KMeans = {
+ this.k = k
+ this
+ }
+
+ /** Set maximum number of iterations to run. Default: 20. */
+ def setMaxIterations(maxIterations: Int): KMeans = {
+ this.maxIterations = maxIterations
+ this
+ }
+
+ /**
+ * Set the initialization algorithm. This can be either "random" to choose random points as
+ * initial cluster centers, or "k-means||" to use a parallel variant of k-means++
+ * (Bahmani et al., Scalable K-Means++, VLDB 2012). Default: k-means||.
+ */
+ def setInitializationMode(initializationMode: String): KMeans = {
+ if (initializationMode != KMeans.RANDOM && initializationMode != KMeans.K_MEANS_PARALLEL) {
+ throw new IllegalArgumentException("Invalid initialization mode: " + initializationMode)
+ }
+ this.initializationMode = initializationMode
+ this
+ }
+
+ /**
+ * Set the number of runs of the algorithm to execute in parallel. We initialize the algorithm
+ * this many times with random starting conditions (configured by the initialization mode), then
+ * return the best clustering found over any run. Default: 1.
+ */
+ def setRuns(runs: Int): KMeans = {
+ if (runs <= 0) {
+ throw new IllegalArgumentException("Number of runs must be positive")
+ }
+ this.runs = runs
+ this
+ }
+
+ /**
+ * Set the number of steps for the k-means|| initialization mode. This is an advanced
+ * setting -- the default of 5 is almost always enough. Default: 5.
+ */
+ def setInitializationSteps(initializationSteps: Int): KMeans = {
+ if (initializationSteps <= 0) {
+ throw new IllegalArgumentException("Number of initialization steps must be positive")
+ }
+ this.initializationSteps = initializationSteps
+ this
+ }
+
+ /**
+ * Set the distance threshold within which we've consider centers to have converged.
+ * If all centers move less than this Euclidean distance, we stop iterating one run.
+ */
+ def setEpsilon(epsilon: Double): KMeans = {
+ this.epsilon = epsilon
+ this
+ }
+
+ /**
+ * Train a K-means model on the given set of points; `data` should be cached for high
+ * performance, because this is an iterative algorithm.
+ */
+ def train(data: RDD[Array[Double]]): KMeansModel = {
+ // TODO: check whether data is persistent; this needs RDD.storageLevel to be publicly readable
+
+ val sc = data.sparkContext
+
+ val centers = if (initializationMode == KMeans.RANDOM) {
+ initRandom(data)
+ } else {
+ initKMeansParallel(data)
+ }
+
+ val active = Array.fill(runs)(true)
+ val costs = Array.fill(runs)(0.0)
+
+ var activeRuns = new ArrayBuffer[Int] ++ (0 until runs)
+ var iteration = 0
+
+ // Execute iterations of Lloyd's algorithm until all runs have converged
+ while (iteration < maxIterations && !activeRuns.isEmpty) {
+ type WeightedPoint = (DoubleMatrix, Long)
+ def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): WeightedPoint = {
+ (p1._1.addi(p2._1), p1._2 + p2._2)
+ }
+
+ val activeCenters = activeRuns.map(r => centers(r)).toArray
+ val costAccums = activeRuns.map(_ => sc.accumulator(0.0))
+
+ // Find the sum and count of points mapping to each center
+ val totalContribs = data.mapPartitions { points =>
+ val runs = activeCenters.length
+ val k = activeCenters(0).length
+ val dims = activeCenters(0)(0).length
+
+ val sums = Array.fill(runs, k)(new DoubleMatrix(dims))
+ val counts = Array.fill(runs, k)(0L)
+
+ for (point <- points; (centers, runIndex) <- activeCenters.zipWithIndex) {
+ val (bestCenter, cost) = KMeans.findClosest(centers, point)
+ costAccums(runIndex) += cost
+ sums(runIndex)(bestCenter).addi(new DoubleMatrix(point))
+ counts(runIndex)(bestCenter) += 1
+ }
+
+ val contribs = for (i <- 0 until runs; j <- 0 until k) yield {
+ ((i, j), (sums(i)(j), counts(i)(j)))
+ }
+ contribs.iterator
+ }.reduceByKey(mergeContribs).collectAsMap()
+
+ // Update the cluster centers and costs for each active run
+ for ((run, i) <- activeRuns.zipWithIndex) {
+ var changed = false
+ for (j <- 0 until k) {
+ val (sum, count) = totalContribs((i, j))
+ if (count != 0) {
+ val newCenter = sum.divi(count).data
+ if (MLUtils.squaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) {
+ changed = true
+ }
+ centers(run)(j) = newCenter
+ }
+ }
+ if (!changed) {
+ active(run) = false
+ logInfo("Run " + run + " finished in " + (iteration + 1) + " iterations")
+ }
+ costs(run) = costAccums(i).value
+ }
+
+ activeRuns = activeRuns.filter(active(_))
+ iteration += 1
+ }
+
+ val bestRun = costs.zipWithIndex.min._2
+ new KMeansModel(centers(bestRun))
+ }
+
+ /**
+ * Initialize `runs` sets of cluster centers at random.
+ */
+ private def initRandom(data: RDD[Array[Double]]): Array[ClusterCenters] = {
+ // Sample all the cluster centers in one pass to avoid repeated scans
+ val sample = data.takeSample(true, runs * k, new Random().nextInt())
+ Array.tabulate(runs)(r => sample.slice(r * k, (r + 1) * k))
+ }
+
+ /**
+ * Initialize `runs` sets of cluster centers using the k-means|| algorithm by Bahmani et al.
+ * (Bahmani et al., Scalable K-Means++, VLDB 2012). This is a variant of k-means++ that tries
+ * to find with dissimilar cluster centers by starting with a random center and then doing
+ * passes where more centers are chosen with probability proportional to their squared distance
+ * to the current cluster set. It results in a provable approximation to an optimal clustering.
+ *
+ * The original paper can be found at http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf.
+ */
+ private def initKMeansParallel(data: RDD[Array[Double]]): Array[ClusterCenters] = {
+ // Initialize each run's center to a random point
+ val seed = new Random().nextInt()
+ val sample = data.takeSample(true, runs, seed)
+ val centers = Array.tabulate(runs)(r => ArrayBuffer(sample(r)))
+
+ // On each step, sample 2 * k points on average for each run with probability proportional
+ // to their squared distance from that run's current centers
+ for (step <- 0 until initializationSteps) {
+ val centerArrays = centers.map(_.toArray)
+ val sumCosts = data.flatMap { point =>
+ for (r <- 0 until runs) yield (r, KMeans.pointCost(centerArrays(r), point))
+ }.reduceByKey(_ + _).collectAsMap()
+ val chosen = data.mapPartitionsWithIndex { (index, points) =>
+ val rand = new Random(seed ^ (step << 16) ^ index)
+ for {
+ p <- points
+ r <- 0 until runs
+ if rand.nextDouble() < KMeans.pointCost(centerArrays(r), p) * 2 * k / sumCosts(r)
+ } yield (r, p)
+ }.collect()
+ for ((r, p) <- chosen) {
+ centers(r) += p
+ }
+ }
+
+ // Finally, we might have a set of more than k candidate centers for each run; weigh each
+ // candidate by the number of points in the dataset mapping to it and run a local k-means++
+ // on the weighted centers to pick just k of them
+ val centerArrays = centers.map(_.toArray)
+ val weightMap = data.flatMap { p =>
+ for (r <- 0 until runs) yield ((r, KMeans.findClosest(centerArrays(r), p)._1), 1.0)
+ }.reduceByKey(_ + _).collectAsMap()
+ val finalCenters = (0 until runs).map { r =>
+ val myCenters = centers(r).toArray
+ val myWeights = (0 until myCenters.length).map(i => weightMap.getOrElse((r, i), 0.0)).toArray
+ LocalKMeans.kMeansPlusPlus(r, myCenters, myWeights, k, 30)
+ }
+
+ finalCenters.toArray
+ }
+}
+
+
+/**
+ * Top-level methods for calling K-means clustering.
+ */
+object KMeans {
+ // Initialization mode names
+ val RANDOM = "random"
+ val K_MEANS_PARALLEL = "k-means||"
+
+ def train(
+ data: RDD[Array[Double]],
+ k: Int,
+ maxIterations: Int,
+ runs: Int,
+ initializationMode: String)
+ : KMeansModel =
+ {
+ new KMeans().setK(k)
+ .setMaxIterations(maxIterations)
+ .setRuns(runs)
+ .setInitializationMode(initializationMode)
+ .train(data)
+ }
+
+ def train(data: RDD[Array[Double]], k: Int, maxIterations: Int, runs: Int): KMeansModel = {
+ train(data, k, maxIterations, runs, K_MEANS_PARALLEL)
+ }
+
+ def train(data: RDD[Array[Double]], k: Int, maxIterations: Int): KMeansModel = {
+ train(data, k, maxIterations, 1, K_MEANS_PARALLEL)
+ }
+
+ /**
+ * Return the index of the closest point in `centers` to `point`, as well as its distance.
+ */
+ private[mllib] def findClosest(centers: Array[Array[Double]], point: Array[Double])
+ : (Int, Double) =
+ {
+ var bestDistance = Double.PositiveInfinity
+ var bestIndex = 0
+ for (i <- 0 until centers.length) {
+ val distance = MLUtils.squaredDistance(point, centers(i))
+ if (distance < bestDistance) {
+ bestDistance = distance
+ bestIndex = i
+ }
+ }
+ (bestIndex, bestDistance)
+ }
+
+ /**
+ * Return the K-means cost of a given point against the given cluster centers.
+ */
+ private[mllib] def pointCost(centers: Array[Array[Double]], point: Array[Double]): Double = {
+ var bestDistance = Double.PositiveInfinity
+ for (i <- 0 until centers.length) {
+ val distance = MLUtils.squaredDistance(point, centers(i))
+ if (distance < bestDistance) {
+ bestDistance = distance
+ }
+ }
+ bestDistance
+ }
+
+ def main(args: Array[String]) {
+ if (args.length != 4) {
+ println("Usage: KMeans <master> <input_file> <k> <max_iterations>")
+ System.exit(1)
+ }
+ val (master, inputFile, k, iters) = (args(0), args(1), args(2).toInt, args(3).toInt)
+ val sc = new SparkContext(master, "KMeans")
+ val data = sc.textFile(inputFile).map(line => line.split(' ').map(_.toDouble))
+ val model = KMeans.train(data, k, iters)
+ val cost = model.computeCost(data)
+ println("Cluster centers:")
+ for (c <- model.clusterCenters) {
+ println(" " + c.mkString(" "))
+ }
+ println("Cost: " + cost)
+ System.exit(0)
+ }
+}
diff --git a/mllib/src/main/scala/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/spark/mllib/clustering/KMeansModel.scala
new file mode 100644
index 0000000000..b8f80e80cd
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/clustering/KMeansModel.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.clustering
+
+import spark.RDD
+import spark.SparkContext._
+import spark.mllib.util.MLUtils
+
+
+/**
+ * A clustering model for K-means. Each point belongs to the cluster with the closest center.
+ */
+class KMeansModel(val clusterCenters: Array[Array[Double]]) extends Serializable {
+ /** Total number of clusters. */
+ def k: Int = clusterCenters.length
+
+ /** Return the cluster index that a given point belongs to. */
+ def predict(point: Array[Double]): Int = {
+ KMeans.findClosest(clusterCenters, point)._1
+ }
+
+ /**
+ * Return the K-means cost (sum of squared distances of points to their nearest center) for this
+ * model on the given data.
+ */
+ def computeCost(data: RDD[Array[Double]]): Double = {
+ data.map(p => KMeans.pointCost(clusterCenters, p)).sum
+ }
+}
diff --git a/mllib/src/main/scala/spark/mllib/clustering/LocalKMeans.scala b/mllib/src/main/scala/spark/mllib/clustering/LocalKMeans.scala
new file mode 100644
index 0000000000..89fe7d7e85
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/clustering/LocalKMeans.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.clustering
+
+import scala.util.Random
+
+import org.jblas.{DoubleMatrix, SimpleBlas}
+
+/**
+ * An utility object to run K-means locally. This is private to the ML package because it's used
+ * in the initialization of KMeans but not meant to be publicly exposed.
+ */
+private[mllib] object LocalKMeans {
+ /**
+ * Run K-means++ on the weighted point set `points`. This first does the K-means++
+ * initialization procedure and then roudns of Lloyd's algorithm.
+ */
+ def kMeansPlusPlus(
+ seed: Int,
+ points: Array[Array[Double]],
+ weights: Array[Double],
+ k: Int,
+ maxIterations: Int)
+ : Array[Array[Double]] =
+ {
+ val rand = new Random(seed)
+ val dimensions = points(0).length
+ val centers = new Array[Array[Double]](k)
+
+ // Initialize centers by sampling using the k-means++ procedure
+ centers(0) = pickWeighted(rand, points, weights)
+ for (i <- 1 until k) {
+ // Pick the next center with a probability proportional to cost under current centers
+ val curCenters = centers.slice(0, i)
+ val sum = points.zip(weights).map { case (p, w) =>
+ w * KMeans.pointCost(curCenters, p)
+ }.sum
+ val r = rand.nextDouble() * sum
+ var cumulativeScore = 0.0
+ var j = 0
+ while (j < points.length && cumulativeScore < r) {
+ cumulativeScore += weights(j) * KMeans.pointCost(curCenters, points(j))
+ j += 1
+ }
+ centers(i) = points(j-1)
+ }
+
+ // Run up to maxIterations iterations of Lloyd's algorithm
+ val oldClosest = Array.fill(points.length)(-1)
+ var iteration = 0
+ var moved = true
+ while (moved && iteration < maxIterations) {
+ moved = false
+ val sums = Array.fill(k)(new DoubleMatrix(dimensions))
+ val counts = Array.fill(k)(0.0)
+ for ((p, i) <- points.zipWithIndex) {
+ val index = KMeans.findClosest(centers, p)._1
+ SimpleBlas.axpy(weights(i), new DoubleMatrix(p), sums(index))
+ counts(index) += weights(i)
+ if (index != oldClosest(i)) {
+ moved = true
+ oldClosest(i) = index
+ }
+ }
+ // Update centers
+ for (i <- 0 until k) {
+ if (counts(i) == 0.0) {
+ // Assign center to a random point
+ centers(i) = points(rand.nextInt(points.length))
+ } else {
+ centers(i) = sums(i).divi(counts(i)).data
+ }
+ }
+ iteration += 1
+ }
+
+ centers
+ }
+
+ private def pickWeighted[T](rand: Random, data: Array[T], weights: Array[Double]): T = {
+ val r = rand.nextDouble() * weights.sum
+ var i = 0
+ var curWeight = 0.0
+ while (i < data.length && curWeight < r) {
+ curWeight += weights(i)
+ i += 1
+ }
+ data(i - 1)
+ }
+}
diff --git a/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala
new file mode 100644
index 0000000000..2fb0c8136f
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.optimization
+
+import org.jblas.DoubleMatrix
+
+abstract class Gradient extends Serializable {
+ /**
+ * Compute the gradient for a given row of data.
+ *
+ * @param data - One row of data. Row matrix of size 1xn where n is the number of features.
+ * @param label - Label for this data item.
+ * @param weights - Column matrix containing weights for every feature.
+ */
+ def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix):
+ (DoubleMatrix, Double)
+}
+
+class LogisticGradient extends Gradient {
+ override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix):
+ (DoubleMatrix, Double) = {
+ val margin: Double = -1.0 * data.dot(weights)
+ val gradientMultiplier = (1.0 / (1.0 + math.exp(margin))) - label
+
+ val gradient = data.mul(gradientMultiplier)
+ val loss =
+ if (margin > 0) {
+ math.log(1 + math.exp(0 - margin))
+ } else {
+ math.log(1 + math.exp(margin)) - margin
+ }
+
+ (gradient, loss)
+ }
+}
diff --git a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala
new file mode 100644
index 0000000000..e1b73bc25e
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.optimization
+
+import spark.{Logging, RDD, SparkContext}
+import spark.SparkContext._
+
+import org.jblas.DoubleMatrix
+
+import scala.collection.mutable.ArrayBuffer
+
+
+object GradientDescent {
+
+ /**
+ * Run gradient descent in parallel using mini batches.
+ * Based on Matlab code written by John Duchi.
+ *
+ * @param data - Input data for SGD. RDD of form (label, [feature values]).
+ * @param gradient - Gradient object that will be used to compute the gradient.
+ * @param updater - Updater object that will be used to update the model.
+ * @param stepSize - stepSize to be used during update.
+ * @param numIters - number of iterations that SGD should be run.
+ * @param miniBatchFraction - fraction of the input data set that should be used for
+ * one iteration of SGD. Default value 1.0.
+ *
+ * @return weights - Column matrix containing weights for every feature.
+ * @return lossHistory - Array containing the loss computed for every iteration.
+ */
+ def runMiniBatchSGD(
+ data: RDD[(Double, Array[Double])],
+ gradient: Gradient,
+ updater: Updater,
+ stepSize: Double,
+ numIters: Int,
+ miniBatchFraction: Double=1.0) : (DoubleMatrix, Array[Double]) = {
+
+ val lossHistory = new ArrayBuffer[Double](numIters)
+
+ val nfeatures: Int = data.take(1)(0)._2.length
+ val nexamples: Long = data.count()
+ val miniBatchSize = nexamples * miniBatchFraction
+
+ // Initialize weights as a column matrix
+ var weights = DoubleMatrix.ones(nfeatures)
+ var reg_val = 0.0
+
+ for (i <- 1 to numIters) {
+ val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42+i).map {
+ case (y, features) =>
+ val featuresRow = new DoubleMatrix(features.length, 1, features:_*)
+ val (grad, loss) = gradient.compute(featuresRow, y, weights)
+ (grad, loss)
+ }.reduce((a, b) => (a._1.addi(b._1), a._2 + b._2))
+
+ lossHistory.append(lossSum / miniBatchSize + reg_val)
+ val update = updater.compute(weights, gradientSum.div(miniBatchSize), stepSize, i)
+ weights = update._1
+ reg_val = update._2
+ }
+
+ (weights, lossHistory.toArray)
+ }
+}
diff --git a/mllib/src/main/scala/spark/mllib/optimization/Updater.scala b/mllib/src/main/scala/spark/mllib/optimization/Updater.scala
new file mode 100644
index 0000000000..b864fd4634
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/optimization/Updater.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.optimization
+
+import org.jblas.DoubleMatrix
+
+abstract class Updater extends Serializable {
+ /**
+ * Compute an updated value for weights given the gradient, stepSize and iteration number.
+ *
+ * @param weightsOld - Column matrix of size nx1 where n is the number of features.
+ * @param gradient - Column matrix of size nx1 where n is the number of features.
+ * @param stepSize - step size across iterations
+ * @param iter - Iteration number
+ *
+ * @return weightsNew - Column matrix containing updated weights
+ * @return reg_val - regularization value
+ */
+ def compute(weightsOlds: DoubleMatrix, gradient: DoubleMatrix, stepSize: Double, iter: Int):
+ (DoubleMatrix, Double)
+}
+
+class SimpleUpdater extends Updater {
+ override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix,
+ stepSize: Double, iter: Int): (DoubleMatrix, Double) = {
+ val normGradient = gradient.mul(stepSize / math.sqrt(iter))
+ (weightsOld.sub(normGradient), 0)
+ }
+}
diff --git a/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala
new file mode 100644
index 0000000000..7da96397a6
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.recommendation
+
+import scala.collection.mutable.{ArrayBuffer, BitSet}
+import scala.util.Random
+import scala.util.Sorting
+
+import spark.{HashPartitioner, Partitioner, SparkContext, RDD}
+import spark.storage.StorageLevel
+import spark.KryoRegistrator
+import spark.SparkContext._
+
+import com.esotericsoftware.kryo.Kryo
+import org.jblas.{DoubleMatrix, SimpleBlas, Solve}
+
+
+/**
+ * Out-link information for a user or product block. This includes the original user/product IDs
+ * of the elements within this block, and the list of destination blocks that each user or
+ * product will need to send its feature vector to.
+ */
+private[recommendation] case class OutLinkBlock(
+ elementIds: Array[Int], shouldSend: Array[BitSet])
+
+
+/**
+ * In-link information for a user (or product) block. This includes the original user/product IDs
+ * of the elements within this block, as well as an array of indices and ratings that specify
+ * which user in the block will be rated by which products from each product block (or vice-versa).
+ * Specifically, if this InLinkBlock is for users, ratingsForBlock(b)(i) will contain two arrays,
+ * indices and ratings, for the i'th product that will be sent to us by product block b (call this
+ * P). These arrays represent the users that product P had ratings for (by their index in this
+ * block), as well as the corresponding rating for each one. We can thus use this information when
+ * we get product block b's message to update the corresponding users.
+ */
+private[recommendation] case class InLinkBlock(
+ elementIds: Array[Int], ratingsForBlock: Array[Array[(Array[Int], Array[Double])]])
+
+
+/**
+ * A more compact class to represent a rating than Tuple3[Int, Int, Double].
+ */
+private[recommendation] case class Rating(user: Int, product: Int, rating: Double)
+
+
+/**
+ * Alternating Least Squares matrix factorization.
+ *
+ * This is a blocked implementation of the ALS factorization algorithm that groups the two sets
+ * of factors (referred to as "users" and "products") into blocks and reduces communication by only
+ * sending one copy of each user vector to each product block on each iteration, and only for the
+ * product blocks that need that user's feature vector. This is achieved by precomputing some
+ * information about the ratings matrix to determine the "out-links" of each user (which blocks of
+ * products it will contribute to) and "in-link" information for each product (which of the feature
+ * vectors it receives from each user block it will depend on). This allows us to send only an
+ * array of feature vectors between each user block and product block, and have the product block
+ * find the users' ratings and update the products based on these messages.
+ */
+class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var lambda: Double)
+ extends Serializable
+{
+ def this() = this(-1, 10, 10, 0.01)
+
+ /**
+ * Set the number of blocks to parallelize the computation into; pass -1 for an auto-configured
+ * number of blocks. Default: -1.
+ */
+ def setBlocks(numBlocks: Int): ALS = {
+ this.numBlocks = numBlocks
+ this
+ }
+
+ /** Set the rank of the feature matrices computed (number of features). Default: 10. */
+ def setRank(rank: Int): ALS = {
+ this.rank = rank
+ this
+ }
+
+ /** Set the number of iterations to run. Default: 10. */
+ def setIterations(iterations: Int): ALS = {
+ this.iterations = iterations
+ this
+ }
+
+ /** Set the regularization parameter, lambda. Default: 0.01. */
+ def setLambda(lambda: Double): ALS = {
+ this.lambda = lambda
+ this
+ }
+
+ /**
+ * Run ALS with the configured parmeters on an input RDD of (user, product, rating) triples.
+ * Returns a MatrixFactorizationModel with feature vectors for each user and product.
+ */
+ def train(ratings: RDD[(Int, Int, Double)]): MatrixFactorizationModel = {
+ val numBlocks = if (this.numBlocks == -1) {
+ math.max(ratings.context.defaultParallelism, ratings.partitions.size / 2)
+ } else {
+ this.numBlocks
+ }
+
+ val partitioner = new HashPartitioner(numBlocks)
+
+ val ratingsByUserBlock = ratings.map{ case (u, p, r) => (u % numBlocks, Rating(u, p, r)) }
+ val ratingsByProductBlock = ratings.map{ case (u, p, r) => (p % numBlocks, Rating(p, u, r)) }
+
+ val (userInLinks, userOutLinks) = makeLinkRDDs(numBlocks, ratingsByUserBlock)
+ val (productInLinks, productOutLinks) = makeLinkRDDs(numBlocks, ratingsByProductBlock)
+
+ // Initialize user and product factors randomly
+ val seed = new Random().nextInt()
+ var users = userOutLinks.mapValues(_.elementIds.map(u => randomFactor(rank, seed ^ u)))
+ var products = productOutLinks.mapValues(_.elementIds.map(p => randomFactor(rank, seed ^ ~p)))
+
+ for (iter <- 0 until iterations) {
+ // perform ALS update
+ products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda)
+ users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda)
+ }
+
+ // Flatten and cache the two final RDDs to un-block them
+ val usersOut = users.join(userOutLinks).flatMap { case (b, (factors, outLinkBlock)) =>
+ for (i <- 0 until factors.length) yield (outLinkBlock.elementIds(i), factors(i))
+ }
+ val productsOut = products.join(productOutLinks).flatMap { case (b, (factors, outLinkBlock)) =>
+ for (i <- 0 until factors.length) yield (outLinkBlock.elementIds(i), factors(i))
+ }
+
+ usersOut.persist()
+ productsOut.persist()
+
+ new MatrixFactorizationModel(rank, usersOut, productsOut)
+ }
+
+ /**
+ * Make the out-links table for a block of the users (or products) dataset given the list of
+ * (user, product, rating) values for the users in that block (or the opposite for products).
+ */
+ private def makeOutLinkBlock(numBlocks: Int, ratings: Array[Rating]): OutLinkBlock = {
+ val userIds = ratings.map(_.user).distinct.sorted
+ val numUsers = userIds.length
+ val userIdToPos = userIds.zipWithIndex.toMap
+ val shouldSend = Array.fill(numUsers)(new BitSet(numBlocks))
+ for (r <- ratings) {
+ shouldSend(userIdToPos(r.user))(r.product % numBlocks) = true
+ }
+ OutLinkBlock(userIds, shouldSend)
+ }
+
+ /**
+ * Make the in-links table for a block of the users (or products) dataset given a list of
+ * (user, product, rating) values for the users in that block (or the opposite for products).
+ */
+ private def makeInLinkBlock(numBlocks: Int, ratings: Array[Rating]): InLinkBlock = {
+ val userIds = ratings.map(_.user).distinct.sorted
+ val numUsers = userIds.length
+ val userIdToPos = userIds.zipWithIndex.toMap
+ // Split out our ratings by product block
+ val blockRatings = Array.fill(numBlocks)(new ArrayBuffer[Rating])
+ for (r <- ratings) {
+ blockRatings(r.product % numBlocks) += r
+ }
+ val ratingsForBlock = new Array[Array[(Array[Int], Array[Double])]](numBlocks)
+ for (productBlock <- 0 until numBlocks) {
+ // Create an array of (product, Seq(Rating)) ratings
+ val groupedRatings = blockRatings(productBlock).groupBy(_.product).toArray
+ // Sort them by product ID
+ val ordering = new Ordering[(Int, ArrayBuffer[Rating])] {
+ def compare(a: (Int, ArrayBuffer[Rating]), b: (Int, ArrayBuffer[Rating])): Int = a._1 - b._1
+ }
+ Sorting.quickSort(groupedRatings)(ordering)
+ // Translate the user IDs to indices based on userIdToPos
+ ratingsForBlock(productBlock) = groupedRatings.map { case (p, rs) =>
+ (rs.view.map(r => userIdToPos(r.user)).toArray, rs.view.map(_.rating).toArray)
+ }
+ }
+ InLinkBlock(userIds, ratingsForBlock)
+ }
+
+ /**
+ * Make RDDs of InLinkBlocks and OutLinkBlocks given an RDD of (blockId, (u, p, r)) values for
+ * the users (or (blockId, (p, u, r)) for the products). We create these simultaneously to avoid
+ * having to shuffle the (blockId, (u, p, r)) RDD twice, or to cache it.
+ */
+ private def makeLinkRDDs(numBlocks: Int, ratings: RDD[(Int, Rating)])
+ : (RDD[(Int, InLinkBlock)], RDD[(Int, OutLinkBlock)]) =
+ {
+ val grouped = ratings.partitionBy(new HashPartitioner(numBlocks))
+ val links = grouped.mapPartitionsWithIndex((blockId, elements) => {
+ val ratings = elements.map{_._2}.toArray
+ val inLinkBlock = makeInLinkBlock(numBlocks, ratings)
+ val outLinkBlock = makeOutLinkBlock(numBlocks, ratings)
+ Iterator.single((blockId, (inLinkBlock, outLinkBlock)))
+ }, true)
+ links.persist(StorageLevel.MEMORY_AND_DISK)
+ (links.mapValues(_._1), links.mapValues(_._2))
+ }
+
+ /**
+ * Make a random factor vector with the given seed.
+ * TODO: Initialize things using mapPartitionsWithIndex to make it faster?
+ */
+ private def randomFactor(rank: Int, seed: Int): Array[Double] = {
+ val rand = new Random(seed)
+ Array.fill(rank)(rand.nextDouble)
+ }
+
+ /**
+ * Compute the user feature vectors given the current products (or vice-versa). This first joins
+ * the products with their out-links to generate a set of messages to each destination block
+ * (specifically, the features for the products that user block cares about), then groups these
+ * by destination and joins them with the in-link info to figure out how to update each user.
+ * It returns an RDD of new feature vectors for each user block.
+ */
+ private def updateFeatures(
+ products: RDD[(Int, Array[Array[Double]])],
+ productOutLinks: RDD[(Int, OutLinkBlock)],
+ userInLinks: RDD[(Int, InLinkBlock)],
+ partitioner: Partitioner,
+ rank: Int,
+ lambda: Double)
+ : RDD[(Int, Array[Array[Double]])] =
+ {
+ val numBlocks = products.partitions.size
+ productOutLinks.join(products).flatMap { case (bid, (outLinkBlock, factors)) =>
+ val toSend = Array.fill(numBlocks)(new ArrayBuffer[Array[Double]])
+ for (p <- 0 until outLinkBlock.elementIds.length; userBlock <- 0 until numBlocks) {
+ if (outLinkBlock.shouldSend(p)(userBlock)) {
+ toSend(userBlock) += factors(p)
+ }
+ }
+ toSend.zipWithIndex.map{ case (buf, idx) => (idx, (bid, buf.toArray)) }
+ }.groupByKey(partitioner)
+ .join(userInLinks)
+ .mapValues{ case (messages, inLinkBlock) => updateBlock(messages, inLinkBlock, rank, lambda) }
+ }
+
+ /**
+ * Compute the new feature vectors for a block of the users matrix given the list of factors
+ * it received from each product and its InLinkBlock.
+ */
+ def updateBlock(messages: Seq[(Int, Array[Array[Double]])], inLinkBlock: InLinkBlock,
+ rank: Int, lambda: Double)
+ : Array[Array[Double]] =
+ {
+ // Sort the incoming block factor messages by block ID and make them an array
+ val blockFactors = messages.sortBy(_._1).map(_._2).toArray // Array[Array[Double]]
+ val numBlocks = blockFactors.length
+ val numUsers = inLinkBlock.elementIds.length
+
+ // We'll sum up the XtXes using vectors that represent only the lower-triangular part, since
+ // the matrices are symmetric
+ val triangleSize = rank * (rank + 1) / 2
+ val userXtX = Array.fill(numUsers)(DoubleMatrix.zeros(triangleSize))
+ val userXy = Array.fill(numUsers)(DoubleMatrix.zeros(rank))
+
+ // Some temp variables to avoid memory allocation
+ val tempXtX = DoubleMatrix.zeros(triangleSize)
+ val fullXtX = DoubleMatrix.zeros(rank, rank)
+
+ // Compute the XtX and Xy values for each user by adding products it rated in each product block
+ for (productBlock <- 0 until numBlocks) {
+ for (p <- 0 until blockFactors(productBlock).length) {
+ val x = new DoubleMatrix(blockFactors(productBlock)(p))
+ fillXtX(x, tempXtX)
+ val (us, rs) = inLinkBlock.ratingsForBlock(productBlock)(p)
+ for (i <- 0 until us.length) {
+ userXtX(us(i)).addi(tempXtX)
+ SimpleBlas.axpy(rs(i), x, userXy(us(i)))
+ }
+ }
+ }
+
+ // Solve the least-squares problem for each user and return the new feature vectors
+ userXtX.zipWithIndex.map{ case (triangularXtX, index) =>
+ // Compute the full XtX matrix from the lower-triangular part we got above
+ fillFullMatrix(triangularXtX, fullXtX)
+ // Add regularization
+ (0 until rank).foreach(i => fullXtX.data(i*rank + i) += lambda)
+ // Solve the resulting matrix, which is symmetric and positive-definite
+ Solve.solvePositive(fullXtX, userXy(index)).data
+ }
+ }
+
+ /**
+ * Set xtxDest to the lower-triangular part of x transpose * x. For efficiency in summing
+ * these matrices, we store xtxDest as only rank * (rank+1) / 2 values, namely the values
+ * at (0,0), (1,0), (1,1), (2,0), (2,1), (2,2), etc in that order.
+ */
+ private def fillXtX(x: DoubleMatrix, xtxDest: DoubleMatrix) {
+ var i = 0
+ var pos = 0
+ while (i < x.length) {
+ var j = 0
+ while (j <= i) {
+ xtxDest.data(pos) = x.data(i) * x.data(j)
+ pos += 1
+ j += 1
+ }
+ i += 1
+ }
+ }
+
+ /**
+ * Given a triangular matrix in the order of fillXtX above, compute the full symmetric square
+ * matrix that it represents, storing it into destMatrix.
+ */
+ private def fillFullMatrix(triangularMatrix: DoubleMatrix, destMatrix: DoubleMatrix) {
+ val rank = destMatrix.rows
+ var i = 0
+ var pos = 0
+ while (i < rank) {
+ var j = 0
+ while (j <= i) {
+ destMatrix.data(i*rank + j) = triangularMatrix.data(pos)
+ destMatrix.data(j*rank + i) = triangularMatrix.data(pos)
+ pos += 1
+ j += 1
+ }
+ i += 1
+ }
+ }
+}
+
+
+/**
+ * Top-level methods for calling Alternating Least Squares (ALS) matrix factorizaton.
+ */
+object ALS {
+ /**
+ * Train a matrix factorization model given an RDD of ratings given by users to some products,
+ * in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the
+ * product of two lower-rank matrices of a given rank (number of features). To solve for these
+ * features, we run a given number of iterations of ALS. This is done using a level of
+ * parallelism given by `blocks`.
+ *
+ * @param ratings RDD of (userID, productID, rating) pairs
+ * @param rank number of features to use
+ * @param iterations number of iterations of ALS (recommended: 10-20)
+ * @param lambda regularization factor (recommended: 0.01)
+ * @param blocks level of parallelism to split computation into
+ */
+ def train(
+ ratings: RDD[(Int, Int, Double)],
+ rank: Int,
+ iterations: Int,
+ lambda: Double,
+ blocks: Int)
+ : MatrixFactorizationModel =
+ {
+ new ALS(blocks, rank, iterations, lambda).train(ratings)
+ }
+
+ /**
+ * Train a matrix factorization model given an RDD of ratings given by users to some products,
+ * in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the
+ * product of two lower-rank matrices of a given rank (number of features). To solve for these
+ * features, we run a given number of iterations of ALS. The level of parallelism is determined
+ * automatically based on the number of partitions in `ratings`.
+ *
+ * @param ratings RDD of (userID, productID, rating) pairs
+ * @param rank number of features to use
+ * @param iterations number of iterations of ALS (recommended: 10-20)
+ * @param lambda regularization factor (recommended: 0.01)
+ */
+ def train(ratings: RDD[(Int, Int, Double)], rank: Int, iterations: Int, lambda: Double)
+ : MatrixFactorizationModel =
+ {
+ train(ratings, rank, iterations, lambda, -1)
+ }
+
+ /**
+ * Train a matrix factorization model given an RDD of ratings given by users to some products,
+ * in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the
+ * product of two lower-rank matrices of a given rank (number of features). To solve for these
+ * features, we run a given number of iterations of ALS. The level of parallelism is determined
+ * automatically based on the number of partitions in `ratings`.
+ *
+ * @param ratings RDD of (userID, productID, rating) pairs
+ * @param rank number of features to use
+ * @param iterations number of iterations of ALS (recommended: 10-20)
+ */
+ def train(ratings: RDD[(Int, Int, Double)], rank: Int, iterations: Int)
+ : MatrixFactorizationModel =
+ {
+ train(ratings, rank, iterations, 0.01, -1)
+ }
+
+ private class ALSRegistrator extends KryoRegistrator {
+ override def registerClasses(kryo: Kryo) {
+ kryo.register(classOf[Rating])
+ }
+ }
+
+ def main(args: Array[String]) {
+ if (args.length != 5 && args.length != 6) {
+ println("Usage: ALS <master> <ratings_file> <rank> <iterations> <output_dir> [<blocks>]")
+ System.exit(1)
+ }
+ val (master, ratingsFile, rank, iters, outputDir) =
+ (args(0), args(1), args(2).toInt, args(3).toInt, args(4))
+ val blocks = if (args.length == 6) args(5).toInt else -1
+ System.setProperty("spark.serializer", "spark.KryoSerializer")
+ System.setProperty("spark.kryo.registrator", classOf[ALSRegistrator].getName)
+ System.setProperty("spark.kryo.referenceTracking", "false")
+ System.setProperty("spark.locality.wait", "10000")
+ val sc = new SparkContext(master, "ALS")
+ val ratings = sc.textFile(ratingsFile).map { line =>
+ val fields = line.split(',')
+ (fields(0).toInt, fields(1).toInt, fields(2).toDouble)
+ }
+ val model = ALS.train(ratings, rank, iters, 0.01, blocks)
+ model.userFeatures.map{ case (id, vec) => id + "," + vec.mkString(" ") }
+ .saveAsTextFile(outputDir + "/userFeatures")
+ model.productFeatures.map{ case (id, vec) => id + "," + vec.mkString(" ") }
+ .saveAsTextFile(outputDir + "/productFeatures")
+ println("Final user/product features written to " + outputDir)
+ System.exit(0)
+ }
+}
diff --git a/mllib/src/main/scala/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/spark/mllib/recommendation/MatrixFactorizationModel.scala
new file mode 100644
index 0000000000..38637b3dd1
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/recommendation/MatrixFactorizationModel.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.recommendation
+
+import spark.RDD
+import spark.SparkContext._
+
+import org.jblas._
+
+class MatrixFactorizationModel(
+ val rank: Int,
+ val userFeatures: RDD[(Int, Array[Double])],
+ val productFeatures: RDD[(Int, Array[Double])])
+ extends Serializable
+{
+ /** Predict the rating of one user for one product. */
+ def predict(user: Int, product: Int): Double = {
+ val userVector = new DoubleMatrix(userFeatures.lookup(user).head)
+ val productVector = new DoubleMatrix(productFeatures.lookup(product).head)
+ userVector.dot(productVector)
+ }
+
+ // TODO: Figure out what good bulk prediction methods would look like.
+ // Probably want a way to get the top users for a product or vice-versa.
+}
diff --git a/mllib/src/main/scala/spark/mllib/regression/LogisticRegression.scala b/mllib/src/main/scala/spark/mllib/regression/LogisticRegression.scala
new file mode 100644
index 0000000000..bb294c2257
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/regression/LogisticRegression.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.regression
+
+import spark.{Logging, RDD, SparkContext}
+import spark.mllib.optimization._
+import spark.mllib.util.MLUtils
+
+import org.jblas.DoubleMatrix
+
+/**
+ * Logistic Regression using Stochastic Gradient Descent.
+ * Based on Matlab code written by John Duchi.
+ */
+class LogisticRegressionModel(
+ val weights: DoubleMatrix,
+ val intercept: Double,
+ val losses: Array[Double]) extends RegressionModel {
+
+ override def predict(testData: spark.RDD[Array[Double]]) = {
+ testData.map { x =>
+ val margin = new DoubleMatrix(1, x.length, x:_*).mmul(this.weights).get(0) + this.intercept
+ 1.0/ (1.0 + math.exp(margin * -1))
+ }
+ }
+
+ override def predict(testData: Array[Double]): Double = {
+ val dataMat = new DoubleMatrix(1, testData.length, testData:_*)
+ val margin = dataMat.mmul(this.weights).get(0) + this.intercept
+ 1.0/ (1.0 + math.exp(margin * -1))
+ }
+}
+
+class LogisticRegression private (var stepSize: Double, var miniBatchFraction: Double,
+ var numIters: Int)
+ extends Logging {
+
+ /**
+ * Construct a LogisticRegression object with default parameters
+ */
+ def this() = this(1.0, 1.0, 100)
+
+ /**
+ * Set the step size per-iteration of SGD. Default 1.0.
+ */
+ def setStepSize(step: Double) = {
+ this.stepSize = step
+ this
+ }
+
+ /**
+ * Set fraction of data to be used for each SGD iteration. Default 1.0.
+ */
+ def setMiniBatchFraction(fraction: Double) = {
+ this.miniBatchFraction = fraction
+ this
+ }
+
+ /**
+ * Set the number of iterations for SGD. Default 100.
+ */
+ def setNumIterations(iters: Int) = {
+ this.numIters = iters
+ this
+ }
+
+ def train(input: RDD[(Double, Array[Double])]): LogisticRegressionModel = {
+ // Add a extra variable consisting of all 1.0's for the intercept.
+ val data = input.map { case (y, features) =>
+ (y, Array(1.0, features:_*))
+ }
+
+ val (weights, losses) = GradientDescent.runMiniBatchSGD(
+ data, new LogisticGradient(), new SimpleUpdater(), stepSize, numIters, miniBatchFraction)
+
+ val weightsScaled = weights.getRange(1, weights.length)
+ val intercept = weights.get(0)
+
+ val model = new LogisticRegressionModel(weightsScaled, intercept, losses)
+
+ logInfo("Final model weights " + model.weights)
+ logInfo("Final model intercept " + model.intercept)
+ logInfo("Last 10 losses " + model.losses.takeRight(10).mkString(", "))
+ model
+ }
+}
+
+/**
+ * Top-level methods for calling Logistic Regression.
+ */
+object LogisticRegression {
+
+ /**
+ * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed number
+ * of iterations of gradient descent using the specified step size. Each iteration uses
+ * `miniBatchFraction` fraction of the data to calculate the gradient.
+ *
+ * @param input RDD of (label, array of features) pairs.
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @param stepSize Step size to be used for each iteration of gradient descent.
+ * @param miniBatchFraction Fraction of data to be used per iteration.
+ */
+ def train(
+ input: RDD[(Double, Array[Double])],
+ numIterations: Int,
+ stepSize: Double,
+ miniBatchFraction: Double)
+ : LogisticRegressionModel =
+ {
+ new LogisticRegression(stepSize, miniBatchFraction, numIterations).train(input)
+ }
+
+ /**
+ * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed number
+ * of iterations of gradient descent using the specified step size. We use the entire data set to update
+ * the gradient in each iteration.
+ *
+ * @param input RDD of (label, array of features) pairs.
+ * @param stepSize Step size to be used for each iteration of Gradient Descent.
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @return a LogisticRegressionModel which has the weights and offset from training.
+ */
+ def train(
+ input: RDD[(Double, Array[Double])],
+ numIterations: Int,
+ stepSize: Double)
+ : LogisticRegressionModel =
+ {
+ train(input, numIterations, stepSize, 1.0)
+ }
+
+ /**
+ * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed number
+ * of iterations of gradient descent using a step size of 1.0. We use the entire data set to update
+ * the gradient in each iteration.
+ *
+ * @param input RDD of (label, array of features) pairs.
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @return a LogisticRegressionModel which has the weights and offset from training.
+ */
+ def train(
+ input: RDD[(Double, Array[Double])],
+ numIterations: Int)
+ : LogisticRegressionModel =
+ {
+ train(input, numIterations, 1.0, 1.0)
+ }
+
+ def main(args: Array[String]) {
+ if (args.length != 4) {
+ println("Usage: LogisticRegression <master> <input_dir> <step_size> <niters>")
+ System.exit(1)
+ }
+ val sc = new SparkContext(args(0), "LogisticRegression")
+ val data = MLUtils.loadLabeledData(sc, args(1))
+ val model = LogisticRegression.train(data, args(3).toInt, args(2).toDouble)
+
+ sc.stop()
+ }
+}
diff --git a/mllib/src/main/scala/spark/mllib/regression/LogisticRegressionGenerator.scala b/mllib/src/main/scala/spark/mllib/regression/LogisticRegressionGenerator.scala
new file mode 100644
index 0000000000..8094d22405
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/regression/LogisticRegressionGenerator.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.regression
+
+import scala.util.Random
+
+import org.jblas.DoubleMatrix
+
+import spark.{RDD, SparkContext}
+import spark.mllib.util.MLUtils
+
+object LogisticRegressionGenerator {
+
+ def main(args: Array[String]) {
+ if (args.length != 5) {
+ println("Usage: LogisticRegressionGenerator " +
+ "<master> <output_dir> <num_examples> <num_features> <num_partitions>")
+ System.exit(1)
+ }
+
+ val sparkMaster: String = args(0)
+ val outputPath: String = args(1)
+ val nexamples: Int = if (args.length > 2) args(2).toInt else 1000
+ val nfeatures: Int = if (args.length > 3) args(3).toInt else 2
+ val parts: Int = if (args.length > 4) args(4).toInt else 2
+ val eps = 3
+
+ val sc = new SparkContext(sparkMaster, "LogisticRegressionGenerator")
+
+ val data: RDD[(Double, Array[Double])] = sc.parallelize(0 until nexamples, parts).map { idx =>
+ val rnd = new Random(42 + idx)
+
+ val y = if (idx % 2 == 0) 0 else 1
+ val x = Array.fill[Double](nfeatures) {
+ rnd.nextGaussian() + (y * eps)
+ }
+ (y, x)
+ }
+
+ MLUtils.saveLabeledData(data, outputPath)
+ sc.stop()
+ }
+}
diff --git a/mllib/src/main/scala/spark/mllib/regression/Regression.scala b/mllib/src/main/scala/spark/mllib/regression/Regression.scala
new file mode 100644
index 0000000000..645204ddf3
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/regression/Regression.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.regression
+
+import spark.RDD
+
+trait RegressionModel {
+ /**
+ * Predict values for the given data set using the model trained.
+ *
+ * @param testData RDD representing data points to be predicted
+ * @return RDD[Double] where each entry contains the corresponding prediction
+ */
+ def predict(testData: RDD[Array[Double]]): RDD[Double]
+
+ /**
+ * Predict values for a single data point using the model trained.
+ *
+ * @param testData array representing a single data point
+ * @return Double prediction from the trained model
+ */
+ def predict(testData: Array[Double]): Double
+}
diff --git a/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala b/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala
new file mode 100644
index 0000000000..7c7f912b43
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.regression
+
+import spark.{Logging, RDD, SparkContext}
+import spark.mllib.util.MLUtils
+
+import org.jblas.DoubleMatrix
+import org.jblas.Solve
+
+import scala.annotation.tailrec
+import scala.collection.mutable
+
+/**
+ * Ridge Regression from Joseph Gonzalez's implementation in MLBase
+ */
+class RidgeRegressionModel(
+ val weights: DoubleMatrix,
+ val intercept: Double,
+ val lambdaOpt: Double,
+ val lambdas: Seq[(Double, Double, DoubleMatrix)])
+ extends RegressionModel {
+
+ override def predict(testData: RDD[Array[Double]]): RDD[Double] = {
+ testData.map { x =>
+ (new DoubleMatrix(1, x.length, x:_*).mmul(this.weights)).get(0) + this.intercept
+ }
+ }
+
+ override def predict(testData: Array[Double]): Double = {
+ (new DoubleMatrix(1, testData.length, testData:_*).mmul(this.weights)).get(0) + this.intercept
+ }
+}
+
+class RidgeRegression private (var lambdaLow: Double, var lambdaHigh: Double)
+ extends Logging {
+
+ def this() = this(0.0, 100.0)
+
+ /**
+ * Set the lower bound on binary search for lambda's. Default is 0.
+ */
+ def setLowLambda(low: Double) = {
+ this.lambdaLow = low
+ this
+ }
+
+ /**
+ * Set the upper bound on binary search for lambda's. Default is 100.0.
+ */
+ def setHighLambda(hi: Double) = {
+ this.lambdaHigh = hi
+ this
+ }
+
+ def train(input: RDD[(Double, Array[Double])]): RidgeRegressionModel = {
+ val nfeatures: Int = input.take(1)(0)._2.length
+ val nexamples: Long = input.count()
+
+ val (yMean, xColMean, xColSd) = MLUtils.computeStats(input, nfeatures, nexamples)
+
+ val data = input.map { case(y, features) =>
+ val yNormalized = y - yMean
+ val featuresMat = new DoubleMatrix(nfeatures, 1, features:_*)
+ val featuresNormalized = featuresMat.sub(xColMean).divi(xColSd)
+ (yNormalized, featuresNormalized.toArray)
+ }
+
+ // Compute XtX - Size of XtX is nfeatures by nfeatures
+ val XtX: DoubleMatrix = data.map { case (y, features) =>
+ val x = new DoubleMatrix(1, features.length, features:_*)
+ x.transpose().mmul(x)
+ }.reduce(_.addi(_))
+
+ // Compute Xt*y - Size of Xty is nfeatures by 1
+ val Xty: DoubleMatrix = data.map { case (y, features) =>
+ new DoubleMatrix(features.length, 1, features:_*).mul(y)
+ }.reduce(_.addi(_))
+
+ // Define a function to compute the leave one out cross validation error
+ // for a single example
+ def crossValidate(lambda: Double): (Double, Double, DoubleMatrix) = {
+ // Compute the MLE ridge regression parameter value
+
+ // Ridge Regression parameter = inv(XtX + \lambda*I) * Xty
+ val XtXlambda = DoubleMatrix.eye(nfeatures).muli(lambda).addi(XtX)
+ val w = Solve.solveSymmetric(XtXlambda, Xty)
+
+ val invXtX = Solve.solveSymmetric(XtXlambda, DoubleMatrix.eye(nfeatures))
+
+ // compute the generalized cross validation score
+ val cverror = data.map {
+ case (y, features) =>
+ val x = new DoubleMatrix(features.length, 1, features:_*)
+ val yhat = w.transpose().mmul(x).get(0)
+ val H_ii = x.transpose().mmul(invXtX).mmul(x).get(0)
+ val residual = (y - yhat) / (1.0 - H_ii)
+ residual * residual
+ }.reduce(_ + _) / nexamples
+
+ (lambda, cverror, w)
+ }
+
+ // Binary search for the best assignment to lambda.
+ def binSearch(low: Double, high: Double): Seq[(Double, Double, DoubleMatrix)] = {
+ val buffer = mutable.ListBuffer.empty[(Double, Double, DoubleMatrix)]
+
+ @tailrec
+ def loop(low: Double, high: Double): Seq[(Double, Double, DoubleMatrix)] = {
+ val mid = (high - low) / 2 + low
+ val lowValue = crossValidate((mid - low) / 2 + low)
+ val highValue = crossValidate((high - mid) / 2 + mid)
+ val (newLow, newHigh) = if (lowValue._2 < highValue._2) {
+ (low, mid + (high-low)/4)
+ } else {
+ (mid - (high-low)/4, high)
+ }
+ if (newHigh - newLow > 1.0E-7) {
+ buffer += lowValue += highValue
+ loop(newLow, newHigh)
+ } else {
+ buffer += lowValue += highValue
+ buffer.result()
+ }
+ }
+
+ loop(low, high)
+ }
+
+ // Actually compute the best lambda
+ val lambdas = binSearch(lambdaLow, lambdaHigh).sortBy(_._1)
+
+ // Find the best parameter set by taking the lowest cverror.
+ val (lambdaOpt, cverror, weights) = lambdas.reduce((a, b) => if (a._2 < b._2) a else b)
+
+ // Return the model which contains the solution
+ val weightsScaled = weights.div(xColSd)
+ val intercept = yMean - (weights.transpose().mmul(xColMean.div(xColSd)).get(0))
+ val model = new RidgeRegressionModel(weightsScaled, intercept, lambdaOpt, lambdas)
+
+ logInfo("RidgeRegression: optimal lambda " + model.lambdaOpt)
+ logInfo("RidgeRegression: optimal weights " + model.weights)
+ logInfo("RidgeRegression: optimal intercept " + model.intercept)
+ logInfo("RidgeRegression: cross-validation error " + cverror)
+
+ model
+ }
+}
+
+/**
+ * Top-level methods for calling Ridge Regression.
+ */
+object RidgeRegression {
+
+ /**
+ * Train a ridge regression model given an RDD of (response, features) pairs.
+ * We use the closed form solution to compute the cross-validation score for
+ * a given lambda. The optimal lambda is computed by performing binary search
+ * between the provided bounds of lambda.
+ *
+ * @param input RDD of (response, array of features) pairs.
+ * @param lambdaLow lower bound used in binary search for lambda
+ * @param lambdaHigh upper bound used in binary search for lambda
+ */
+ def train(
+ input: RDD[(Double, Array[Double])],
+ lambdaLow: Double,
+ lambdaHigh: Double)
+ : RidgeRegressionModel =
+ {
+ new RidgeRegression(lambdaLow, lambdaHigh).train(input)
+ }
+
+ /**
+ * Train a ridge regression model given an RDD of (response, features) pairs.
+ * We use the closed form solution to compute the cross-validation score for
+ * a given lambda. The optimal lambda is computed by performing binary search
+ * between lambda values of 0 and 100.
+ *
+ * @param input RDD of (response, array of features) pairs.
+ */
+ def train(input: RDD[(Double, Array[Double])]) : RidgeRegressionModel = {
+ train(input, 0.0, 100.0)
+ }
+
+ def main(args: Array[String]) {
+ if (args.length != 2) {
+ println("Usage: RidgeRegression <master> <input_dir>")
+ System.exit(1)
+ }
+ val sc = new SparkContext(args(0), "RidgeRegression")
+ val data = MLUtils.loadLabeledData(sc, args(1))
+ val model = RidgeRegression.train(data, 0, 1000)
+ sc.stop()
+ }
+}
diff --git a/mllib/src/main/scala/spark/mllib/regression/RidgeRegressionGenerator.scala b/mllib/src/main/scala/spark/mllib/regression/RidgeRegressionGenerator.scala
new file mode 100644
index 0000000000..c2260ae286
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/regression/RidgeRegressionGenerator.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.regression
+
+import scala.util.Random
+
+import org.jblas.DoubleMatrix
+
+import spark.{RDD, SparkContext}
+import spark.mllib.util.MLUtils
+
+
+object RidgeRegressionGenerator {
+
+ def main(args: Array[String]) {
+ if (args.length != 5) {
+ println("Usage: RidgeRegressionGenerator " +
+ "<master> <output_dir> <num_examples> <num_features> <num_partitions>")
+ System.exit(1)
+ }
+
+ val sparkMaster: String = args(0)
+ val outputPath: String = args(1)
+ val nexamples: Int = if (args.length > 2) args(2).toInt else 1000
+ val nfeatures: Int = if (args.length > 3) args(3).toInt else 100
+ val parts: Int = if (args.length > 4) args(4).toInt else 2
+ val eps = 10
+
+ org.jblas.util.Random.seed(42)
+ val sc = new SparkContext(sparkMaster, "RidgeRegressionGenerator")
+
+ // Random values distributed uniformly in [-0.5, 0.5]
+ val w = DoubleMatrix.rand(nfeatures, 1).subi(0.5)
+ w.put(0, 0, 10)
+ w.put(1, 0, 10)
+
+ val data: RDD[(Double, Array[Double])] = sc.parallelize(0 until parts, parts).flatMap { p =>
+ org.jblas.util.Random.seed(42 + p)
+ val examplesInPartition = nexamples / parts
+
+ val X = DoubleMatrix.rand(examplesInPartition, nfeatures)
+ val y = X.mmul(w)
+
+ val rnd = new Random(42 + p)
+
+ val normalValues = Array.fill[Double](examplesInPartition)(rnd.nextGaussian() * eps)
+ val yObs = new DoubleMatrix(normalValues).addi(y)
+
+ Iterator.tabulate(examplesInPartition) { i =>
+ (yObs.get(i, 0), X.getRow(i).toArray)
+ }
+ }
+
+ MLUtils.saveLabeledData(data, outputPath)
+ sc.stop()
+ }
+}
diff --git a/mllib/src/main/scala/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/spark/mllib/util/MLUtils.scala
new file mode 100644
index 0000000000..b5e564df6d
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/util/MLUtils.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.util
+
+import spark.{RDD, SparkContext}
+import spark.SparkContext._
+
+import org.jblas.DoubleMatrix
+
+/**
+ * Helper methods to load and save data
+ * Data format:
+ * <l>, <f1> <f2> ...
+ * where <f1>, <f2> are feature values in Double and <l> is the corresponding label as Double.
+ */
+object MLUtils {
+
+ /**
+ * @param sc SparkContext
+ * @param dir Directory to the input data files.
+ * @return An RDD of tuples. For each tuple, the first element is the label, and the second
+ * element represents the feature values (an array of Double).
+ */
+ def loadLabeledData(sc: SparkContext, dir: String): RDD[(Double, Array[Double])] = {
+ sc.textFile(dir).map { line =>
+ val parts = line.split(",")
+ val label = parts(0).toDouble
+ val features = parts(1).trim().split(" ").map(_.toDouble)
+ (label, features)
+ }
+ }
+
+ def saveLabeledData(data: RDD[(Double, Array[Double])], dir: String) {
+ val dataStr = data.map(x => x._1 + "," + x._2.mkString(" "))
+ dataStr.saveAsTextFile(dir)
+ }
+
+ /**
+ * Utility function to compute mean and standard deviation on a given dataset.
+ *
+ * @param data - input data set whose statistics are computed
+ * @param nfeatures - number of features
+ * @param nexamples - number of examples in input dataset
+ *
+ * @return (yMean, xColMean, xColSd) - Tuple consisting of
+ * yMean - mean of the labels
+ * xColMean - Row vector with mean for every column (or feature) of the input data
+ * xColSd - Row vector standard deviation for every column (or feature) of the input data.
+ */
+ def computeStats(data: RDD[(Double, Array[Double])], nfeatures: Int, nexamples: Long):
+ (Double, DoubleMatrix, DoubleMatrix) = {
+ val yMean: Double = data.map { case (y, features) => y }.reduce(_ + _) / nexamples
+
+ // NOTE: We shuffle X by column here to compute column sum and sum of squares.
+ val xColSumSq: RDD[(Int, (Double, Double))] = data.flatMap { case(y, features) =>
+ val nCols = features.length
+ // Traverse over every column and emit (col, value, value^2)
+ Iterator.tabulate(nCols) { i =>
+ (i, (features(i), features(i)*features(i)))
+ }
+ }.reduceByKey { case(x1, x2) =>
+ (x1._1 + x2._1, x1._2 + x2._2)
+ }
+ val xColSumsMap = xColSumSq.collectAsMap()
+
+ val xColMean = DoubleMatrix.zeros(nfeatures, 1)
+ val xColSd = DoubleMatrix.zeros(nfeatures, 1)
+
+ // Compute mean and unbiased variance using column sums
+ var col = 0
+ while (col < nfeatures) {
+ xColMean.put(col, xColSumsMap(col)._1 / nexamples)
+ val variance =
+ (xColSumsMap(col)._2 - (math.pow(xColSumsMap(col)._1, 2) / nexamples)) / (nexamples)
+ xColSd.put(col, math.sqrt(variance))
+ col += 1
+ }
+
+ (yMean, xColMean, xColSd)
+ }
+
+ /**
+ * Return the squared Euclidean distance between two vectors.
+ */
+ def squaredDistance(v1: Array[Double], v2: Array[Double]): Double = {
+ if (v1.length != v2.length) {
+ throw new IllegalArgumentException("Vector sizes don't match")
+ }
+ var i = 0
+ var sum = 0.0
+ while (i < v1.length) {
+ sum += (v1(i) - v2(i)) * (v1(i) - v2(i))
+ i += 1
+ }
+ sum
+ }
+}
diff --git a/mllib/src/test/resources/log4j.properties b/mllib/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..a112e0b506
--- /dev/null
+++ b/mllib/src/test/resources/log4j.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file core/target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=ml/target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
+
diff --git a/mllib/src/test/scala/spark/mllib/clustering/KMeansSuite.scala b/mllib/src/test/scala/spark/mllib/clustering/KMeansSuite.scala
new file mode 100644
index 0000000000..bebade9afb
--- /dev/null
+++ b/mllib/src/test/scala/spark/mllib/clustering/KMeansSuite.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.clustering
+
+import scala.util.Random
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.FunSuite
+
+import spark.SparkContext
+import spark.SparkContext._
+
+import org.jblas._
+
+
+class KMeansSuite extends FunSuite with BeforeAndAfterAll {
+ val sc = new SparkContext("local", "test")
+
+ override def afterAll() {
+ sc.stop()
+ System.clearProperty("spark.driver.port")
+ }
+
+ val EPSILON = 1e-4
+
+ import KMeans.{RANDOM, K_MEANS_PARALLEL}
+
+ def prettyPrint(point: Array[Double]): String = point.mkString("(", ", ", ")")
+
+ def prettyPrint(points: Array[Array[Double]]): String = {
+ points.map(prettyPrint).mkString("(", "; ", ")")
+ }
+
+ // L1 distance between two points
+ def distance1(v1: Array[Double], v2: Array[Double]): Double = {
+ v1.zip(v2).map{ case (a, b) => math.abs(a-b) }.max
+ }
+
+ // Assert that two vectors are equal within tolerance EPSILON
+ def assertEqual(v1: Array[Double], v2: Array[Double]) {
+ def errorMessage = prettyPrint(v1) + " did not equal " + prettyPrint(v2)
+ assert(v1.length == v2.length, errorMessage)
+ assert(distance1(v1, v2) <= EPSILON, errorMessage)
+ }
+
+ // Assert that two sets of points are equal, within EPSILON tolerance
+ def assertSetsEqual(set1: Array[Array[Double]], set2: Array[Array[Double]]) {
+ def errorMessage = prettyPrint(set1) + " did not equal " + prettyPrint(set2)
+ assert(set1.length == set2.length, errorMessage)
+ for (v <- set1) {
+ val closestDistance = set2.map(w => distance1(v, w)).min
+ if (closestDistance > EPSILON) {
+ fail(errorMessage)
+ }
+ }
+ for (v <- set2) {
+ val closestDistance = set1.map(w => distance1(v, w)).min
+ if (closestDistance > EPSILON) {
+ fail(errorMessage)
+ }
+ }
+ }
+
+ test("single cluster") {
+ val data = sc.parallelize(Array(
+ Array(1.0, 2.0, 6.0),
+ Array(1.0, 3.0, 0.0),
+ Array(1.0, 4.0, 6.0)
+ ))
+
+ // No matter how many runs or iterations we use, we should get one cluster,
+ // centered at the mean of the points
+
+ var model = KMeans.train(data, k=1, maxIterations=1)
+ assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0)))
+
+ model = KMeans.train(data, k=1, maxIterations=2)
+ assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0)))
+
+ model = KMeans.train(data, k=1, maxIterations=5)
+ assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0)))
+
+ model = KMeans.train(data, k=1, maxIterations=1, runs=5)
+ assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0)))
+
+ model = KMeans.train(data, k=1, maxIterations=1, runs=5)
+ assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0)))
+
+ model = KMeans.train(data, k=1, maxIterations=1, runs=1, initializationMode=RANDOM)
+ assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0)))
+
+ model = KMeans.train(
+ data, k=1, maxIterations=1, runs=1, initializationMode=K_MEANS_PARALLEL)
+ assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0)))
+ }
+
+ test("single cluster with big dataset") {
+ val smallData = Array(
+ Array(1.0, 2.0, 6.0),
+ Array(1.0, 3.0, 0.0),
+ Array(1.0, 4.0, 6.0)
+ )
+ val data = sc.parallelize((1 to 100).flatMap(_ => smallData), 4)
+
+ // No matter how many runs or iterations we use, we should get one cluster,
+ // centered at the mean of the points
+
+ var model = KMeans.train(data, k=1, maxIterations=1)
+ assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0)))
+
+ model = KMeans.train(data, k=1, maxIterations=2)
+ assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0)))
+
+ model = KMeans.train(data, k=1, maxIterations=5)
+ assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0)))
+
+ model = KMeans.train(data, k=1, maxIterations=1, runs=5)
+ assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0)))
+
+ model = KMeans.train(data, k=1, maxIterations=1, runs=5)
+ assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0)))
+
+ model = KMeans.train(data, k=1, maxIterations=1, runs=1, initializationMode=RANDOM)
+ assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0)))
+
+ model = KMeans.train(data, k=1, maxIterations=1, runs=1, initializationMode=K_MEANS_PARALLEL)
+ assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0)))
+ }
+
+ test("k-means|| initialization") {
+ val points = Array(
+ Array(1.0, 2.0, 6.0),
+ Array(1.0, 3.0, 0.0),
+ Array(1.0, 4.0, 6.0),
+ Array(1.0, 0.0, 1.0),
+ Array(1.0, 1.0, 1.0)
+ )
+ val rdd = sc.parallelize(points)
+
+ // K-means|| initialization should place all clusters into distinct centers because
+ // it will make at least five passes, and it will give non-zero probability to each
+ // unselected point as long as it hasn't yet selected all of them
+
+ var model = KMeans.train(rdd, k=5, maxIterations=1)
+ assertSetsEqual(model.clusterCenters, points)
+
+ // Iterations of Lloyd's should not change the answer either
+ model = KMeans.train(rdd, k=5, maxIterations=10)
+ assertSetsEqual(model.clusterCenters, points)
+
+ // Neither should more runs
+ model = KMeans.train(rdd, k=5, maxIterations=10, runs=5)
+ assertSetsEqual(model.clusterCenters, points)
+ }
+}
diff --git a/mllib/src/test/scala/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/spark/mllib/recommendation/ALSSuite.scala
new file mode 100644
index 0000000000..f98590b8d9
--- /dev/null
+++ b/mllib/src/test/scala/spark/mllib/recommendation/ALSSuite.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.recommendation
+
+import scala.util.Random
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.FunSuite
+
+import spark.SparkContext
+import spark.SparkContext._
+
+import org.jblas._
+
+
+class ALSSuite extends FunSuite with BeforeAndAfterAll {
+ val sc = new SparkContext("local", "test")
+
+ override def afterAll() {
+ sc.stop()
+ System.clearProperty("spark.driver.port")
+ }
+
+ test("rank-1 matrices") {
+ testALS(10, 20, 1, 15, 0.7, 0.3)
+ }
+
+ test("rank-2 matrices") {
+ testALS(20, 30, 2, 15, 0.7, 0.3)
+ }
+
+ /**
+ * Test if we can correctly factorize R = U * P where U and P are of known rank.
+ *
+ * @param users number of users
+ * @param products number of products
+ * @param features number of features (rank of problem)
+ * @param iterations number of iterations to run
+ * @param samplingRate what fraction of the user-product pairs are known
+ * @param matchThreshold max difference allowed to consider a predicted rating correct
+ */
+ def testALS(users: Int, products: Int, features: Int, iterations: Int,
+ samplingRate: Double, matchThreshold: Double)
+ {
+ val rand = new Random(42)
+
+ // Create a random matrix with uniform values from -1 to 1
+ def randomMatrix(m: Int, n: Int) =
+ new DoubleMatrix(m, n, Array.fill(m * n)(rand.nextDouble() * 2 - 1): _*)
+
+ val userMatrix = randomMatrix(users, features)
+ val productMatrix = randomMatrix(features, products)
+ val trueRatings = userMatrix.mmul(productMatrix)
+
+ val sampledRatings = {
+ for (u <- 0 until users; p <- 0 until products if rand.nextDouble() < samplingRate)
+ yield (u, p, trueRatings.get(u, p))
+ }
+
+ val model = ALS.train(sc.parallelize(sampledRatings), features, iterations)
+
+ val predictedU = new DoubleMatrix(users, features)
+ for ((u, vec) <- model.userFeatures.collect(); i <- 0 until features) {
+ predictedU.put(u, i, vec(i))
+ }
+ val predictedP = new DoubleMatrix(products, features)
+ for ((p, vec) <- model.productFeatures.collect(); i <- 0 until features) {
+ predictedP.put(p, i, vec(i))
+ }
+ val predictedRatings = predictedU.mmul(predictedP.transpose)
+
+ for (u <- 0 until users; p <- 0 until products) {
+ val prediction = predictedRatings.get(u, p)
+ val correct = trueRatings.get(u, p)
+ if (math.abs(prediction - correct) > matchThreshold) {
+ fail("Model failed to predict (%d, %d): %f vs %f\ncorr: %s\npred: %s\nU: %s\n P: %s".format(
+ u, p, correct, prediction, trueRatings, predictedRatings, predictedU, predictedP))
+ }
+ }
+ }
+}
+
diff --git a/mllib/src/test/scala/spark/mllib/regression/LogisticRegressionSuite.scala b/mllib/src/test/scala/spark/mllib/regression/LogisticRegressionSuite.scala
new file mode 100644
index 0000000000..bc9bfd054f
--- /dev/null
+++ b/mllib/src/test/scala/spark/mllib/regression/LogisticRegressionSuite.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.regression
+
+import scala.util.Random
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.FunSuite
+
+import spark.SparkContext
+import spark.SparkContext._
+
+
+class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll {
+ val sc = new SparkContext("local", "test")
+
+ override def afterAll() {
+ sc.stop()
+ System.clearProperty("spark.driver.port")
+ }
+
+ // Test if we can correctly learn A, B where Y = logistic(A + B*X)
+ test("logistic regression") {
+ val nPoints = 10000
+ val rnd = new Random(42)
+
+ val x1 = Array.fill[Double](nPoints)(rnd.nextGaussian())
+
+ val A = 2.0
+ val B = -1.5
+
+ // NOTE: if U is uniform[0, 1] then ln(u) - ln(1-u) is Logistic(0,1)
+ val unifRand = new scala.util.Random(45)
+ val rLogis = (0 until nPoints).map { i =>
+ val u = unifRand.nextDouble()
+ math.log(u) - math.log(1.0-u)
+ }
+
+ // y <- A + B*x + rlogis(100)
+ // y <- as.numeric(y > 0)
+ val y = (0 until nPoints).map { i =>
+ val yVal = A + B * x1(i) + rLogis(i)
+ if (yVal > 0) 1.0 else 0.0
+ }
+
+ val testData = (0 until nPoints).map(i => (y(i).toDouble, Array(x1(i)))).toArray
+
+ val testRDD = sc.parallelize(testData, 2)
+ testRDD.cache()
+ val lr = new LogisticRegression().setStepSize(10.0)
+ .setNumIterations(20)
+
+ val model = lr.train(testRDD)
+
+ val weight0 = model.weights.get(0)
+ assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]")
+ assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]")
+ }
+}
diff --git a/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala b/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala
new file mode 100644
index 0000000000..3c588c6162
--- /dev/null
+++ b/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.regression
+
+import scala.util.Random
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.FunSuite
+
+import spark.SparkContext
+import spark.SparkContext._
+
+
+class RidgeRegressionSuite extends FunSuite with BeforeAndAfterAll {
+ val sc = new SparkContext("local", "test")
+
+ override def afterAll() {
+ sc.stop()
+ System.clearProperty("spark.driver.port")
+ }
+
+ // Test if we can correctly learn Y = 3 + X1 + X2 when
+ // X1 and X2 are collinear.
+ test("multi-collinear variables") {
+ val rnd = new Random(43)
+ val x1 = Array.fill[Double](20)(rnd.nextGaussian())
+
+ // Pick a mean close to mean of x1
+ val rnd1 = new Random(42) //new NormalDistribution(0.1, 0.01)
+ val x2 = Array.fill[Double](20)(0.1 + rnd1.nextGaussian() * 0.01)
+
+ val xMat = (0 until 20).map(i => Array(x1(i), x2(i))).toArray
+
+ val y = xMat.map(i => 3 + i(0) + i(1))
+ val testData = (0 until 20).map(i => (y(i), xMat(i))).toArray
+
+ val testRDD = sc.parallelize(testData, 2)
+ testRDD.cache()
+ val ridgeReg = new RidgeRegression().setLowLambda(0)
+ .setHighLambda(10)
+
+ val model = ridgeReg.train(testRDD)
+
+ assert(model.intercept >= 2.9 && model.intercept <= 3.1)
+ assert(model.weights.length === 2)
+ assert(model.weights.get(0) >= 0.9 && model.weights.get(0) <= 1.1)
+ assert(model.weights.get(1) >= 0.9 && model.weights.get(1) <= 1.1)
+ }
+}