diff options
16 files changed, 385 insertions, 111 deletions
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 3d36761cda..4cf7eb96da 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -34,22 +34,21 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel) : Iterator[T] = { val key = "rdd_%d_%d".format(rdd.id, split.index) - logInfo("Cache key is " + key) + logDebug("Looking for partition " + key) blockManager.get(key) match { - case Some(cachedValues) => - // Partition is in cache, so just return its values - logInfo("Found partition in cache!") - return cachedValues.asInstanceOf[Iterator[T]] + case Some(values) => + // Partition is already materialized, so just return its values + return values.asInstanceOf[Iterator[T]] case None => // Mark the split as loading (unless someone else marks it first) loading.synchronized { if (loading.contains(key)) { - logInfo("Loading contains " + key + ", waiting...") + logInfo("Another thread is loading %s, waiting for it to finish...".format(key)) while (loading.contains(key)) { try {loading.wait()} catch {case _ : Throwable =>} } - logInfo("Loading no longer contains " + key + ", so returning cached result") + logInfo("Finished waiting for %s".format(key)) // See whether someone else has successfully loaded it. The main way this would fail // is for the RDD-level cache eviction policy if someone else has loaded the same RDD // partition but we didn't want to make space for it. However, that case is unlikely @@ -59,7 +58,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { case Some(values) => return values.asInstanceOf[Iterator[T]] case None => - logInfo("Whoever was loading " + key + " failed; we'll try it ourselves") + logInfo("Whoever was loading %s failed; we'll try it ourselves".format(key)) loading.add(key) } } else { @@ -68,7 +67,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { } try { // If we got here, we have to load the split - logInfo("Computing partition " + split) + logInfo("Partition %s not found, computing it".format(key)) val computedValues = rdd.computeOrReadCheckpoint(split, context) // Persist the result, so long as the task is not running locally if (context.runningLocally) { return computedValues } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala index 5a24042e14..c87b66f047 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala @@ -34,7 +34,7 @@ class ApplicationSource(val application: ApplicationInfo) extends Source { override def getValue: Long = application.duration }) - metricRegistry.register(MetricRegistry.name("cores", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("cores"), new Gauge[Int] { override def getValue: Int = application.coresGranted }) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala index 23d1cb77da..36c1b87b7f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala @@ -26,17 +26,17 @@ private[spark] class MasterSource(val master: Master) extends Source { val sourceName = "master" // Gauge for worker numbers in cluster - metricRegistry.register(MetricRegistry.name("workers","number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("workers"), new Gauge[Int] { override def getValue: Int = master.workers.size }) // Gauge for application numbers in cluster - metricRegistry.register(MetricRegistry.name("apps", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("apps"), new Gauge[Int] { override def getValue: Int = master.apps.size }) // Gauge for waiting application numbers in cluster - metricRegistry.register(MetricRegistry.name("waitingApps", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("waitingApps"), new Gauge[Int] { override def getValue: Int = master.waitingApps.size }) } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala index df269fd047..b24e936b1f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala @@ -25,27 +25,27 @@ private[spark] class WorkerSource(val worker: Worker) extends Source { val sourceName = "worker" val metricRegistry = new MetricRegistry() - metricRegistry.register(MetricRegistry.name("executors", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("executors"), new Gauge[Int] { override def getValue: Int = worker.executors.size }) // Gauge for cores used of this worker - metricRegistry.register(MetricRegistry.name("coresUsed", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("coresUsed"), new Gauge[Int] { override def getValue: Int = worker.coresUsed }) // Gauge for memory used of this worker - metricRegistry.register(MetricRegistry.name("memUsed", "MBytes"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("memUsed_MBytes"), new Gauge[Int] { override def getValue: Int = worker.memoryUsed }) // Gauge for cores free of this worker - metricRegistry.register(MetricRegistry.name("coresFree", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("coresFree"), new Gauge[Int] { override def getValue: Int = worker.coresFree }) // Gauge for memory free of this worker - metricRegistry.register(MetricRegistry.name("memFree", "MBytes"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("memFree_MBytes"), new Gauge[Int] { override def getValue: Int = worker.memoryFree }) } diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala index 18c9dc1c0a..34ed9c8f73 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala @@ -43,31 +43,31 @@ class ExecutorSource(val executor: Executor, executorId: String) extends Source val sourceName = "executor.%s".format(executorId) // Gauge for executor thread pool's actively executing task counts - metricRegistry.register(MetricRegistry.name("threadpool", "activeTask", "count"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] { override def getValue: Int = executor.threadPool.getActiveCount() }) // Gauge for executor thread pool's approximate total number of tasks that have been completed - metricRegistry.register(MetricRegistry.name("threadpool", "completeTask", "count"), new Gauge[Long] { + metricRegistry.register(MetricRegistry.name("threadpool", "completeTasks"), new Gauge[Long] { override def getValue: Long = executor.threadPool.getCompletedTaskCount() }) // Gauge for executor thread pool's current number of threads - metricRegistry.register(MetricRegistry.name("threadpool", "currentPool", "size"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("threadpool", "currentPool_size"), new Gauge[Int] { override def getValue: Int = executor.threadPool.getPoolSize() }) // Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool - metricRegistry.register(MetricRegistry.name("threadpool", "maxPool", "size"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("threadpool", "maxPool_size"), new Gauge[Int] { override def getValue: Int = executor.threadPool.getMaximumPoolSize() }) // Gauge for file system stats of this executor for (scheme <- Array("hdfs", "file")) { - registerFileSystemStat(scheme, "bytesRead", _.getBytesRead(), 0L) - registerFileSystemStat(scheme, "bytesWritten", _.getBytesWritten(), 0L) - registerFileSystemStat(scheme, "readOps", _.getReadOps(), 0) - registerFileSystemStat(scheme, "largeReadOps", _.getLargeReadOps(), 0) - registerFileSystemStat(scheme, "writeOps", _.getWriteOps(), 0) + registerFileSystemStat(scheme, "read_bytes", _.getBytesRead(), 0L) + registerFileSystemStat(scheme, "write_bytes", _.getBytesWritten(), 0L) + registerFileSystemStat(scheme, "read_ops", _.getReadOps(), 0) + registerFileSystemStat(scheme, "largeRead_ops", _.getLargeReadOps(), 0) + registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala index 446d490cc9..151514896f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala @@ -27,23 +27,23 @@ private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: Spar val metricRegistry = new MetricRegistry() val sourceName = "%s.DAGScheduler".format(sc.appName) - metricRegistry.register(MetricRegistry.name("stage", "failedStages", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("stage", "failedStages"), new Gauge[Int] { override def getValue: Int = dagScheduler.failed.size }) - metricRegistry.register(MetricRegistry.name("stage", "runningStages", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("stage", "runningStages"), new Gauge[Int] { override def getValue: Int = dagScheduler.running.size }) - metricRegistry.register(MetricRegistry.name("stage", "waitingStages", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("stage", "waitingStages"), new Gauge[Int] { override def getValue: Int = dagScheduler.waiting.size }) - metricRegistry.register(MetricRegistry.name("job", "allJobs", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("job", "allJobs"), new Gauge[Int] { override def getValue: Int = dagScheduler.nextJobId.get() }) - metricRegistry.register(MetricRegistry.name("job", "activeJobs", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("job", "activeJobs"), new Gauge[Int] { override def getValue: Int = dagScheduler.activeJobs.size }) } diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 24ef204aa1..6c500bad92 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -38,8 +38,6 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging def newKryoOutput() = new KryoOutput(bufferSize) - def newKryoInput() = new KryoInput(bufferSize) - def newKryo(): Kryo = { val instantiator = new ScalaKryoInstantiator val kryo = instantiator.newKryo() @@ -118,8 +116,10 @@ class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends Deser private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance { val kryo = ks.newKryo() - val output = ks.newKryoOutput() - val input = ks.newKryoInput() + + // Make these lazy vals to avoid creating a buffer unless we use them + lazy val output = ks.newKryoOutput() + lazy val input = new KryoInput() def serialize[T](t: T): ByteBuffer = { output.clear() diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 495a72db69..37d0ddb17b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -523,7 +523,17 @@ private[spark] class BlockManager( * Get a block from the block manager (either local or remote). */ def get(blockId: String): Option[Iterator[Any]] = { - getLocal(blockId).orElse(getRemote(blockId)) + val local = getLocal(blockId) + if (local.isDefined) { + logInfo("Found block %s locally".format(blockId)) + return local + } + val remote = getRemote(blockId) + if (remote.isDefined) { + logInfo("Found block %s remotely".format(blockId)) + return remote + } + None } /** diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala index acc3951088..e5068d5587 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala @@ -28,7 +28,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar val metricRegistry = new MetricRegistry() val sourceName = "%s.BlockManager".format(sc.appName) - metricRegistry.register(MetricRegistry.name("memory", "maxMem", "MBytes"), new Gauge[Long] { + metricRegistry.register(MetricRegistry.name("memory", "maxMem_MBytes"), new Gauge[Long] { override def getValue: Long = { val storageStatusList = blockManager.master.getStorageStatus val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _) @@ -36,7 +36,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar } }) - metricRegistry.register(MetricRegistry.name("memory", "remainingMem", "MBytes"), new Gauge[Long] { + metricRegistry.register(MetricRegistry.name("memory", "remainingMem_MBytes"), new Gauge[Long] { override def getValue: Long = { val storageStatusList = blockManager.master.getStorageStatus val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _) @@ -44,7 +44,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar } }) - metricRegistry.register(MetricRegistry.name("memory", "memUsed", "MBytes"), new Gauge[Long] { + metricRegistry.register(MetricRegistry.name("memory", "memUsed_MBytes"), new Gauge[Long] { override def getValue: Long = { val storageStatusList = blockManager.master.getStorageStatus val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _) @@ -53,7 +53,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar } }) - metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed", "MBytes"), new Gauge[Long] { + metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed_MBytes"), new Gauge[Long] { override def getValue: Long = { val storageStatusList = blockManager.master.getStorageStatus val diskSpaceUsed = storageStatusList diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 07c9f2382b..8f0ec6683b 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -26,7 +26,12 @@ class UISuite extends FunSuite { test("jetty port increases under contention") { val startPort = 4040 val server = new Server(startPort) - server.start() + + Try { server.start() } match { + case Success(s) => + case Failure(e) => + // Either case server port is busy hence setup for test complete + } val (jettyServer1, boundPort1) = JettyUtils.startJettyServer("localhost", startPort, Seq()) val (jettyServer2, boundPort2) = JettyUtils.startJettyServer("localhost", startPort, Seq()) diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md index f991d86c8d..c1ff9c417c 100644 --- a/docs/mllib-guide.md +++ b/docs/mllib-guide.md @@ -144,10 +144,9 @@ Available algorithms for clustering: # Collaborative Filtering -[Collaborative -filtering](http://en.wikipedia.org/wiki/Recommender_system#Collaborative_filtering) +[Collaborative filtering](http://en.wikipedia.org/wiki/Recommender_system#Collaborative_filtering) is commonly used for recommender systems. These techniques aim to fill in the -missing entries of a user-product association matrix. MLlib currently supports +missing entries of a user-item association matrix. MLlib currently supports model-based collaborative filtering, in which users and products are described by a small set of latent factors that can be used to predict missing entries. In particular, we implement the [alternating least squares @@ -158,7 +157,24 @@ following parameters: * *numBlocks* is the number of blacks used to parallelize computation (set to -1 to auto-configure). * *rank* is the number of latent factors in our model. * *iterations* is the number of iterations to run. -* *lambda* specifies the regularization parameter in ALS. +* *lambda* specifies the regularization parameter in ALS. +* *implicitPrefs* specifies whether to use the *explicit feedback* ALS variant or one adapted for *implicit feedback* data +* *alpha* is a parameter applicable to the implicit feedback variant of ALS that governs the *baseline* confidence in preference observations + +## Explicit vs Implicit Feedback + +The standard approach to matrix factorization based collaborative filtering treats +the entries in the user-item matrix as *explicit* preferences given by the user to the item. + +It is common in many real-world use cases to only have access to *implicit feedback* +(e.g. views, clicks, purchases, likes, shares etc.). The approach used in MLlib to deal with +such data is taken from +[Collaborative Filtering for Implicit Feedback Datasets](http://research.yahoo.com/pub/2433). +Essentially instead of trying to model the matrix of ratings directly, this approach treats the data as +a combination of binary preferences and *confidence values*. The ratings are then related +to the level of confidence in observed user preferences, rather than explicit ratings given to items. +The model then tries to find latent factors that can be used to predict the expected preference of a user +for an item. Available algorithms for collaborative filtering: diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 7da9355b5e..65868b76b9 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -155,7 +155,7 @@ def is_active(instance): # Return correct versions of Spark and Shark, given the supplied Spark version def get_spark_shark_version(opts): - spark_shark_map = {"0.7.3": "0.7.0", "0.8.0": "0.8.0"} + spark_shark_map = {"0.7.3": "0.7.1", "0.8.0": "0.8.0"} version = opts.spark_version.replace("v", "") if version not in spark_shark_map: print >> stderr, "Don't know about Spark version: %s" % version diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index be002d02bc..36853acab5 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -21,7 +21,8 @@ import scala.collection.mutable.{ArrayBuffer, BitSet} import scala.util.Random import scala.util.Sorting -import org.apache.spark.{HashPartitioner, Partitioner, SparkContext} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.{Logging, HashPartitioner, Partitioner, SparkContext} import org.apache.spark.storage.StorageLevel import org.apache.spark.rdd.RDD import org.apache.spark.serializer.KryoRegistrator @@ -61,6 +62,12 @@ case class Rating(val user: Int, val product: Int, val rating: Double) /** * Alternating Least Squares matrix factorization. * + * ALS attempts to estimate the ratings matrix `R` as the product of two lower-rank matrices, + * `X` and `Y`, i.e. `Xt * Y = R`. Typically these approximations are called 'factor' matrices. + * The general approach is iterative. During each iteration, one of the factor matrices is held + * constant, while the other is solved for using least squares. The newly-solved factor matrix is + * then held constant while solving for the other factor matrix. + * * This is a blocked implementation of the ALS factorization algorithm that groups the two sets * of factors (referred to as "users" and "products") into blocks and reduces communication by only * sending one copy of each user vector to each product block on each iteration, and only for the @@ -70,11 +77,21 @@ case class Rating(val user: Int, val product: Int, val rating: Double) * vectors it receives from each user block it will depend on). This allows us to send only an * array of feature vectors between each user block and product block, and have the product block * find the users' ratings and update the products based on these messages. + * + * For implicit preference data, the algorithm used is based on + * "Collaborative Filtering for Implicit Feedback Datasets", available at + * [[http://research.yahoo.com/pub/2433]], adapted for the blocked approach used here. + * + * Essentially instead of finding the low-rank approximations to the rating matrix `R`, + * this finds the approximations for a preference matrix `P` where the elements of `P` are 1 if r > 0 + * and 0 if r = 0. The ratings then act as 'confidence' values related to strength of indicated user + * preferences rather than explicit ratings given to items. */ -class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var lambda: Double) - extends Serializable +class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var lambda: Double, + var implicitPrefs: Boolean, var alpha: Double) + extends Serializable with Logging { - def this() = this(-1, 10, 10, 0.01) + def this() = this(-1, 10, 10, 0.01, false, 1.0) /** * Set the number of blocks to parallelize the computation into; pass -1 for an auto-configured @@ -103,6 +120,16 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l this } + def setImplicitPrefs(implicitPrefs: Boolean): ALS = { + this.implicitPrefs = implicitPrefs + this + } + + def setAlpha(alpha: Double): ALS = { + this.alpha = alpha + this + } + /** * Run ALS with the configured parameters on an input RDD of (user, product, rating) triples. * Returns a MatrixFactorizationModel with feature vectors for each user and product. @@ -147,19 +174,24 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l } } - for (iter <- 0 until iterations) { + for (iter <- 1 to iterations) { // perform ALS update - products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda) - users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda) + logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations)) + // YtY / XtX is an Option[DoubleMatrix] and is only required for the implicit feedback model + val YtY = computeYtY(users) + val YtYb = ratings.context.broadcast(YtY) + products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda, + alpha, YtYb) + logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations)) + val XtX = computeYtY(products) + val XtXb = ratings.context.broadcast(XtX) + users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda, + alpha, XtXb) } // Flatten and cache the two final RDDs to un-block them - val usersOut = users.join(userOutLinks).flatMap { case (b, (factors, outLinkBlock)) => - for (i <- 0 until factors.length) yield (outLinkBlock.elementIds(i), factors(i)) - } - val productsOut = products.join(productOutLinks).flatMap { case (b, (factors, outLinkBlock)) => - for (i <- 0 until factors.length) yield (outLinkBlock.elementIds(i), factors(i)) - } + val usersOut = unblockFactors(users, userOutLinks) + val productsOut = unblockFactors(products, productOutLinks) usersOut.persist() productsOut.persist() @@ -168,6 +200,40 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l } /** + * Computes the (`rank x rank`) matrix `YtY`, where `Y` is the (`nui x rank`) matrix of factors + * for each user (or product), in a distributed fashion. Here `reduceByKeyLocally` is used as + * the driver program requires `YtY` to broadcast it to the slaves + * @param factors the (block-distributed) user or product factor vectors + * @return Option[YtY] - whose value is only used in the implicit preference model + */ + def computeYtY(factors: RDD[(Int, Array[Array[Double]])]) = { + if (implicitPrefs) { + Option( + factors.flatMapValues{ case factorArray => + factorArray.map{ vector => + val x = new DoubleMatrix(vector) + x.mmul(x.transpose()) + } + }.reduceByKeyLocally((a, b) => a.addi(b)) + .values + .reduce((a, b) => a.addi(b)) + ) + } else { + None + } + } + + /** + * Flatten out blocked user or product factors into an RDD of (id, factor vector) pairs + */ + def unblockFactors(blockedFactors: RDD[(Int, Array[Array[Double]])], + outLinks: RDD[(Int, OutLinkBlock)]) = { + blockedFactors.join(outLinks).flatMap{ case (b, (factors, outLinkBlock)) => + for (i <- 0 until factors.length) yield (outLinkBlock.elementIds(i), factors(i)) + } + } + + /** * Make the out-links table for a block of the users (or products) dataset given the list of * (user, product, rating) values for the users in that block (or the opposite for products). */ @@ -251,7 +317,9 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l userInLinks: RDD[(Int, InLinkBlock)], partitioner: Partitioner, rank: Int, - lambda: Double) + lambda: Double, + alpha: Double, + YtY: Broadcast[Option[DoubleMatrix]]) : RDD[(Int, Array[Array[Double]])] = { val numBlocks = products.partitions.size @@ -265,7 +333,9 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l toSend.zipWithIndex.map{ case (buf, idx) => (idx, (bid, buf.toArray)) } }.groupByKey(partitioner) .join(userInLinks) - .mapValues{ case (messages, inLinkBlock) => updateBlock(messages, inLinkBlock, rank, lambda) } + .mapValues{ case (messages, inLinkBlock) => + updateBlock(messages, inLinkBlock, rank, lambda, alpha, YtY) + } } /** @@ -273,7 +343,7 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l * it received from each product and its InLinkBlock. */ def updateBlock(messages: Seq[(Int, Array[Array[Double]])], inLinkBlock: InLinkBlock, - rank: Int, lambda: Double) + rank: Int, lambda: Double, alpha: Double, YtY: Broadcast[Option[DoubleMatrix]]) : Array[Array[Double]] = { // Sort the incoming block factor messages by block ID and make them an array @@ -298,8 +368,14 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l fillXtX(x, tempXtX) val (us, rs) = inLinkBlock.ratingsForBlock(productBlock)(p) for (i <- 0 until us.length) { - userXtX(us(i)).addi(tempXtX) - SimpleBlas.axpy(rs(i), x, userXy(us(i))) + implicitPrefs match { + case false => + userXtX(us(i)).addi(tempXtX) + SimpleBlas.axpy(rs(i), x, userXy(us(i))) + case true => + userXtX(us(i)).addi(tempXtX.mul(alpha * rs(i))) + SimpleBlas.axpy(1 + alpha * rs(i), x, userXy(us(i))) + } } } } @@ -311,7 +387,10 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l // Add regularization (0 until rank).foreach(i => fullXtX.data(i*rank + i) += lambda) // Solve the resulting matrix, which is symmetric and positive-definite - Solve.solvePositive(fullXtX, userXy(index)).data + implicitPrefs match { + case false => Solve.solvePositive(fullXtX, userXy(index)).data + case true => Solve.solvePositive(fullXtX.add(YtY.value.get), userXy(index)).data + } } } @@ -381,7 +460,7 @@ object ALS { blocks: Int) : MatrixFactorizationModel = { - new ALS(blocks, rank, iterations, lambda).run(ratings) + new ALS(blocks, rank, iterations, lambda, false, 1.0).run(ratings) } /** @@ -419,6 +498,68 @@ object ALS { train(ratings, rank, iterations, 0.01, -1) } + /** + * Train a matrix factorization model given an RDD of 'implicit preferences' given by users + * to some products, in the form of (userID, productID, preference) pairs. We approximate the + * ratings matrix as the product of two lower-rank matrices of a given rank (number of features). + * To solve for these features, we run a given number of iterations of ALS. This is done using + * a level of parallelism given by `blocks`. + * + * @param ratings RDD of (userID, productID, rating) pairs + * @param rank number of features to use + * @param iterations number of iterations of ALS (recommended: 10-20) + * @param lambda regularization factor (recommended: 0.01) + * @param blocks level of parallelism to split computation into + * @param alpha confidence parameter (only applies when immplicitPrefs = true) + */ + def trainImplicit( + ratings: RDD[Rating], + rank: Int, + iterations: Int, + lambda: Double, + blocks: Int, + alpha: Double) + : MatrixFactorizationModel = + { + new ALS(blocks, rank, iterations, lambda, true, alpha).run(ratings) + } + + /** + * Train a matrix factorization model given an RDD of 'implicit preferences' given by users to + * some products, in the form of (userID, productID, preference) pairs. We approximate the + * ratings matrix as the product of two lower-rank matrices of a given rank (number of features). + * To solve for these features, we run a given number of iterations of ALS. The level of + * parallelism is determined automatically based on the number of partitions in `ratings`. + * + * @param ratings RDD of (userID, productID, rating) pairs + * @param rank number of features to use + * @param iterations number of iterations of ALS (recommended: 10-20) + * @param lambda regularization factor (recommended: 0.01) + */ + def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double, alpha: Double) + : MatrixFactorizationModel = + { + trainImplicit(ratings, rank, iterations, lambda, -1, alpha) + } + + /** + * Train a matrix factorization model given an RDD of 'implicit preferences' ratings given by + * users to some products, in the form of (userID, productID, rating) pairs. We approximate the + * ratings matrix as the product of two lower-rank matrices of a given rank (number of features). + * To solve for these features, we run a given number of iterations of ALS. The level of + * parallelism is determined automatically based on the number of partitions in `ratings`. + * Model parameters `alpha` and `lambda` are set to reasonable default values + * + * @param ratings RDD of (userID, productID, rating) pairs + * @param rank number of features to use + * @param iterations number of iterations of ALS (recommended: 10-20) + */ + def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int) + : MatrixFactorizationModel = + { + trainImplicit(ratings, rank, iterations, 0.01, -1, 1.0) + } + private class ALSRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[Rating]) @@ -426,29 +567,37 @@ object ALS { } def main(args: Array[String]) { - if (args.length != 5 && args.length != 6) { - println("Usage: ALS <master> <ratings_file> <rank> <iterations> <output_dir> [<blocks>]") + if (args.length < 5 || args.length > 9) { + println("Usage: ALS <master> <ratings_file> <rank> <iterations> <output_dir> " + + "[<lambda>] [<implicitPrefs>] [<alpha>] [<blocks>]") System.exit(1) } val (master, ratingsFile, rank, iters, outputDir) = (args(0), args(1), args(2).toInt, args(3).toInt, args(4)) - val blocks = if (args.length == 6) args(5).toInt else -1 + val lambda = if (args.length >= 6) args(5).toDouble else 0.01 + val implicitPrefs = if (args.length >= 7) args(6).toBoolean else false + val alpha = if (args.length >= 8) args(7).toDouble else 1 + val blocks = if (args.length == 9) args(8).toInt else -1 + System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") System.setProperty("spark.kryo.registrator", classOf[ALSRegistrator].getName) System.setProperty("spark.kryo.referenceTracking", "false") System.setProperty("spark.kryoserializer.buffer.mb", "8") System.setProperty("spark.locality.wait", "10000") + val sc = new SparkContext(master, "ALS") val ratings = sc.textFile(ratingsFile).map { line => val fields = line.split(',') Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble) } - val model = ALS.train(ratings, rank, iters, 0.01, blocks) + val model = new ALS(rank = rank, iterations = iters, lambda = lambda, + numBlocks = blocks, implicitPrefs = implicitPrefs, alpha = alpha).run(ratings) + model.userFeatures.map{ case (id, vec) => id + "," + vec.mkString(" ") } .saveAsTextFile(outputDir + "/userFeatures") model.productFeatures.map{ case (id, vec) => id + "," + vec.mkString(" ") } .saveAsTextFile(outputDir + "/productFeatures") println("Final user/product features written to " + outputDir) - System.exit(0) + sc.stop() } } diff --git a/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java index 3323f6cee2..eafee060cd 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java @@ -19,6 +19,7 @@ package org.apache.spark.mllib.recommendation; import java.io.Serializable; import java.util.List; +import java.lang.Math; import scala.Tuple2; @@ -48,7 +49,7 @@ public class JavaALSSuite implements Serializable { } void validatePrediction(MatrixFactorizationModel model, int users, int products, int features, - DoubleMatrix trueRatings, double matchThreshold) { + DoubleMatrix trueRatings, double matchThreshold, boolean implicitPrefs, DoubleMatrix truePrefs) { DoubleMatrix predictedU = new DoubleMatrix(users, features); List<scala.Tuple2<Object, double[]>> userFeatures = model.userFeatures().toJavaRDD().collect(); for (int i = 0; i < features; ++i) { @@ -68,12 +69,32 @@ public class JavaALSSuite implements Serializable { DoubleMatrix predictedRatings = predictedU.mmul(predictedP.transpose()); - for (int u = 0; u < users; ++u) { - for (int p = 0; p < products; ++p) { - double prediction = predictedRatings.get(u, p); - double correct = trueRatings.get(u, p); - Assert.assertTrue(Math.abs(prediction - correct) < matchThreshold); + if (!implicitPrefs) { + for (int u = 0; u < users; ++u) { + for (int p = 0; p < products; ++p) { + double prediction = predictedRatings.get(u, p); + double correct = trueRatings.get(u, p); + Assert.assertTrue(String.format("Prediction=%2.4f not below match threshold of %2.2f", + prediction, matchThreshold), Math.abs(prediction - correct) < matchThreshold); + } } + } else { + // For implicit prefs we use the confidence-weighted RMSE to test (ref Mahout's implicit ALS tests) + double sqErr = 0.0; + double denom = 0.0; + for (int u = 0; u < users; ++u) { + for (int p = 0; p < products; ++p) { + double prediction = predictedRatings.get(u, p); + double truePref = truePrefs.get(u, p); + double confidence = 1.0 + /* alpha = */ 1.0 * trueRatings.get(u, p); + double err = confidence * (truePref - prediction) * (truePref - prediction); + sqErr += err; + denom += 1.0; + } + } + double rmse = Math.sqrt(sqErr / denom); + Assert.assertTrue(String.format("Confidence-weighted RMSE=%2.4f above threshold of %2.2f", + rmse, matchThreshold), Math.abs(rmse) < matchThreshold); } } @@ -81,30 +102,62 @@ public class JavaALSSuite implements Serializable { public void runALSUsingStaticMethods() { int features = 1; int iterations = 15; - int users = 10; - int products = 10; - scala.Tuple2<List<Rating>, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( - users, products, features, 0.7); + int users = 50; + int products = 100; + scala.Tuple3<List<Rating>, DoubleMatrix, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( + users, products, features, 0.7, false); JavaRDD<Rating> data = sc.parallelize(testData._1()); MatrixFactorizationModel model = ALS.train(data.rdd(), features, iterations); - validatePrediction(model, users, products, features, testData._2(), 0.3); + validatePrediction(model, users, products, features, testData._2(), 0.3, false, testData._3()); } @Test public void runALSUsingConstructor() { int features = 2; int iterations = 15; - int users = 20; - int products = 30; - scala.Tuple2<List<Rating>, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( - users, products, features, 0.7); + int users = 100; + int products = 200; + scala.Tuple3<List<Rating>, DoubleMatrix, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( + users, products, features, 0.7, false); JavaRDD<Rating> data = sc.parallelize(testData._1()); MatrixFactorizationModel model = new ALS().setRank(features) .setIterations(iterations) .run(data.rdd()); - validatePrediction(model, users, products, features, testData._2(), 0.3); + validatePrediction(model, users, products, features, testData._2(), 0.3, false, testData._3()); + } + + @Test + public void runImplicitALSUsingStaticMethods() { + int features = 1; + int iterations = 15; + int users = 80; + int products = 160; + scala.Tuple3<List<Rating>, DoubleMatrix, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( + users, products, features, 0.7, true); + + JavaRDD<Rating> data = sc.parallelize(testData._1()); + MatrixFactorizationModel model = ALS.trainImplicit(data.rdd(), features, iterations); + validatePrediction(model, users, products, features, testData._2(), 0.4, true, testData._3()); + } + + @Test + public void runImplicitALSUsingConstructor() { + int features = 2; + int iterations = 15; + int users = 100; + int products = 200; + scala.Tuple3<List<Rating>, DoubleMatrix, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( + users, products, features, 0.7, true); + + JavaRDD<Rating> data = sc.parallelize(testData._1()); + + MatrixFactorizationModel model = new ALS().setRank(features) + .setIterations(iterations) + .setImplicitPrefs(true) + .run(data.rdd()); + validatePrediction(model, users, products, features, testData._2(), 0.4, true, testData._3()); } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala index 347ef238f4..fafc5ec5f2 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala @@ -34,16 +34,19 @@ object ALSSuite { users: Int, products: Int, features: Int, - samplingRate: Double): (java.util.List[Rating], DoubleMatrix) = { - val (sampledRatings, trueRatings) = generateRatings(users, products, features, samplingRate) - (seqAsJavaList(sampledRatings), trueRatings) + samplingRate: Double, + implicitPrefs: Boolean): (java.util.List[Rating], DoubleMatrix, DoubleMatrix) = { + val (sampledRatings, trueRatings, truePrefs) = + generateRatings(users, products, features, samplingRate, implicitPrefs) + (seqAsJavaList(sampledRatings), trueRatings, truePrefs) } def generateRatings( users: Int, products: Int, features: Int, - samplingRate: Double): (Seq[Rating], DoubleMatrix) = { + samplingRate: Double, + implicitPrefs: Boolean = false): (Seq[Rating], DoubleMatrix, DoubleMatrix) = { val rand = new Random(42) // Create a random matrix with uniform values from -1 to 1 @@ -52,14 +55,20 @@ object ALSSuite { val userMatrix = randomMatrix(users, features) val productMatrix = randomMatrix(features, products) - val trueRatings = userMatrix.mmul(productMatrix) + val (trueRatings, truePrefs) = implicitPrefs match { + case true => + val raw = new DoubleMatrix(users, products, Array.fill(users * products)(rand.nextInt(10).toDouble): _*) + val prefs = new DoubleMatrix(users, products, raw.data.map(v => if (v > 0) 1.0 else 0.0): _*) + (raw, prefs) + case false => (userMatrix.mmul(productMatrix), null) + } val sampledRatings = { for (u <- 0 until users; p <- 0 until products if rand.nextDouble() < samplingRate) yield Rating(u, p, trueRatings.get(u, p)) } - (sampledRatings, trueRatings) + (sampledRatings, trueRatings, truePrefs) } } @@ -78,11 +87,19 @@ class ALSSuite extends FunSuite with BeforeAndAfterAll { } test("rank-1 matrices") { - testALS(10, 20, 1, 15, 0.7, 0.3) + testALS(50, 100, 1, 15, 0.7, 0.3) } test("rank-2 matrices") { - testALS(20, 30, 2, 15, 0.7, 0.3) + testALS(100, 200, 2, 15, 0.7, 0.3) + } + + test("rank-1 matrices implicit") { + testALS(80, 160, 1, 15, 0.7, 0.4, true) + } + + test("rank-2 matrices implicit") { + testALS(100, 200, 2, 15, 0.7, 0.4, true) } /** @@ -96,11 +113,14 @@ class ALSSuite extends FunSuite with BeforeAndAfterAll { * @param matchThreshold max difference allowed to consider a predicted rating correct */ def testALS(users: Int, products: Int, features: Int, iterations: Int, - samplingRate: Double, matchThreshold: Double) + samplingRate: Double, matchThreshold: Double, implicitPrefs: Boolean = false) { - val (sampledRatings, trueRatings) = ALSSuite.generateRatings(users, products, - features, samplingRate) - val model = ALS.train(sc.parallelize(sampledRatings), features, iterations) + val (sampledRatings, trueRatings, truePrefs) = ALSSuite.generateRatings(users, products, + features, samplingRate, implicitPrefs) + val model = implicitPrefs match { + case false => ALS.train(sc.parallelize(sampledRatings), features, iterations) + case true => ALS.trainImplicit(sc.parallelize(sampledRatings), features, iterations) + } val predictedU = new DoubleMatrix(users, features) for ((u, vec) <- model.userFeatures.collect(); i <- 0 until features) { @@ -112,12 +132,31 @@ class ALSSuite extends FunSuite with BeforeAndAfterAll { } val predictedRatings = predictedU.mmul(predictedP.transpose) - for (u <- 0 until users; p <- 0 until products) { - val prediction = predictedRatings.get(u, p) - val correct = trueRatings.get(u, p) - if (math.abs(prediction - correct) > matchThreshold) { - fail("Model failed to predict (%d, %d): %f vs %f\ncorr: %s\npred: %s\nU: %s\n P: %s".format( - u, p, correct, prediction, trueRatings, predictedRatings, predictedU, predictedP)) + if (!implicitPrefs) { + for (u <- 0 until users; p <- 0 until products) { + val prediction = predictedRatings.get(u, p) + val correct = trueRatings.get(u, p) + if (math.abs(prediction - correct) > matchThreshold) { + fail("Model failed to predict (%d, %d): %f vs %f\ncorr: %s\npred: %s\nU: %s\n P: %s".format( + u, p, correct, prediction, trueRatings, predictedRatings, predictedU, predictedP)) + } + } + } else { + // For implicit prefs we use the confidence-weighted RMSE to test (ref Mahout's tests) + var sqErr = 0.0 + var denom = 0.0 + for (u <- 0 until users; p <- 0 until products) { + val prediction = predictedRatings.get(u, p) + val truePref = truePrefs.get(u, p) + val confidence = 1 + 1.0 * trueRatings.get(u, p) + val err = confidence * (truePref - prediction) * (truePref - prediction) + sqErr += err + denom += 1 + } + val rmse = math.sqrt(sqErr / denom) + if (math.abs(rmse) > matchThreshold) { + fail("Model failed to predict RMSE: %f\ncorr: %s\npred: %s\nU: %s\n P: %s".format( + rmse, truePrefs, predictedRatings, predictedU, predictedP)) } } } diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index cdec6168af..eb4b96eb47 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -156,6 +156,7 @@ object SparkBuild extends Build { */ + libraryDependencies ++= Seq( "org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106", "org.scalatest" %% "scalatest" % "1.9.1" % "test", @@ -178,6 +179,7 @@ object SparkBuild extends Build { val slf4jVersion = "1.7.2" + val excludeCglib = ExclusionRule(organization = "org.sonatype.sisu.inject") val excludeJackson = ExclusionRule(organization = "org.codehaus.jackson") val excludeNetty = ExclusionRule(organization = "org.jboss.netty") val excludeAsm = ExclusionRule(organization = "asm") @@ -210,7 +212,7 @@ object SparkBuild extends Build { "org.apache.mesos" % "mesos" % "0.13.0", "io.netty" % "netty-all" % "4.0.0.Beta2", "org.apache.derby" % "derby" % "10.4.2.0" % "test", - "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm), + "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), "net.java.dev.jets3t" % "jets3t" % "0.7.1", "org.apache.avro" % "avro" % "1.7.4", "org.apache.avro" % "avro-ipc" % "1.7.4" excludeAll(excludeNetty), @@ -248,6 +250,7 @@ object SparkBuild extends Build { exclude("log4j","log4j") exclude("org.apache.cassandra.deps", "avro") excludeAll(excludeSnappy) + excludeAll(excludeCglib) ) ) ++ assemblySettings ++ extraAssemblySettings @@ -290,10 +293,10 @@ object SparkBuild extends Build { def yarnEnabledSettings = Seq( libraryDependencies ++= Seq( // Exclude rule required for all ? - "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm) + "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), + "org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), + "org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), + "org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib) ) ) |