diff options
author | Xiangrui Meng <meng@databricks.com> | 2014-03-18 17:20:42 -0700 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2014-03-18 17:20:42 -0700 |
commit | f9d8a83c0006bb59c61e8770cd201b72333cb9a4 (patch) | |
tree | 31690aca1930996c7e9955311928228fb14541e5 | |
parent | e108b9ab94c4310ec56ef0eda99bb904133f942d (diff) | |
download | spark-f9d8a83c0006bb59c61e8770cd201b72333cb9a4.tar.gz spark-f9d8a83c0006bb59c61e8770cd201b72333cb9a4.tar.bz2 spark-f9d8a83c0006bb59c61e8770cd201b72333cb9a4.zip |
[SPARK-1266] persist factors in implicit ALS
In implicit ALS computation, the user or product factor is used twice in each iteration. Caching can certainly help accelerate the computation. I saw the running time decreased by ~70% for implicit ALS on the movielens data.
I also made the following changes:
1. Change `YtYb` type from `Broadcast[Option[DoubleMatrix]]` to `Option[Broadcast[DoubleMatrix]]`, so we don't need to broadcast None in explicit computation.
2. Mark methods `computeYtY`, `unblockFactors`, `updateBlock`, and `updateFeatures private`. Users do not need those methods.
3. Materialize the final matrix factors before returning the model. It allows us to clean up other cached RDDs before returning the model. I do not have a better solution here, so I use `RDD.count()`.
JIRA: https://spark-project.atlassian.net/browse/SPARK-1266
Author: Xiangrui Meng <meng@databricks.com>
Closes #165 from mengxr/als and squashes the following commits:
c9676a6 [Xiangrui Meng] add a comment about the last products.persist
d3a88aa [Xiangrui Meng] change implicitPrefs match to if ... else ...
63862d6 [Xiangrui Meng] persist factors in implicit ALS
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala | 145 |
1 files changed, 89 insertions, 56 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 777d0db2d6..0cc9f48769 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -148,8 +148,10 @@ class ALS private ( * Returns a MatrixFactorizationModel with feature vectors for each user and product. */ def run(ratings: RDD[Rating]): MatrixFactorizationModel = { + val sc = ratings.context + val numBlocks = if (this.numBlocks == -1) { - math.max(ratings.context.defaultParallelism, ratings.partitions.size / 2) + math.max(sc.defaultParallelism, ratings.partitions.size / 2) } else { this.numBlocks } @@ -187,21 +189,41 @@ class ALS private ( } } - for (iter <- 1 to iterations) { - // perform ALS update - logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations)) - // YtY / XtX is an Option[DoubleMatrix] and is only required for the implicit feedback model - val YtY = computeYtY(users) - val YtYb = ratings.context.broadcast(YtY) - products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda, - alpha, YtYb) - logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations)) - val XtX = computeYtY(products) - val XtXb = ratings.context.broadcast(XtX) - users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda, - alpha, XtXb) + if (implicitPrefs) { + for (iter <- 1 to iterations) { + // perform ALS update + logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations)) + // Persist users because it will be called twice. + users.persist() + val YtY = Some(sc.broadcast(computeYtY(users))) + val previousProducts = products + products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda, + alpha, YtY) + previousProducts.unpersist() + logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations)) + products.persist() + val XtX = Some(sc.broadcast(computeYtY(products))) + val previousUsers = users + users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda, + alpha, XtX) + previousUsers.unpersist() + } + } else { + for (iter <- 1 to iterations) { + // perform ALS update + logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations)) + products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda, + alpha, YtY = None) + logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations)) + users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda, + alpha, YtY = None) + } } + // The last `products` will be used twice. One to generate the last `users` and the other to + // generate `productsOut`. So we cache it for better performance. + products.persist() + // Flatten and cache the two final RDDs to un-block them val usersOut = unblockFactors(users, userOutLinks) val productsOut = unblockFactors(products, productOutLinks) @@ -209,31 +231,39 @@ class ALS private ( usersOut.persist() productsOut.persist() + // Materialize usersOut and productsOut. + usersOut.count() + productsOut.count() + + products.unpersist() + + // Clean up. + userInLinks.unpersist() + userOutLinks.unpersist() + productInLinks.unpersist() + productOutLinks.unpersist() + new MatrixFactorizationModel(rank, usersOut, productsOut) } /** * Computes the (`rank x rank`) matrix `YtY`, where `Y` is the (`nui x rank`) matrix of factors - * for each user (or product), in a distributed fashion. Here `reduceByKeyLocally` is used as - * the driver program requires `YtY` to broadcast it to the slaves + * for each user (or product), in a distributed fashion. + * * @param factors the (block-distributed) user or product factor vectors - * @return Option[YtY] - whose value is only used in the implicit preference model + * @return YtY - whose value is only used in the implicit preference model */ - def computeYtY(factors: RDD[(Int, Array[Array[Double]])]) = { - if (implicitPrefs) { - val n = rank * (rank + 1) / 2 - val LYtY = factors.values.aggregate(new DoubleMatrix(n))( seqOp = (L, Y) => { - Y.foreach(y => dspr(1.0, new DoubleMatrix(y), L)) - L - }, combOp = (L1, L2) => { - L1.addi(L2) - }) - val YtY = new DoubleMatrix(rank, rank) - fillFullMatrix(LYtY, YtY) - Option(YtY) - } else { - None - } + private def computeYtY(factors: RDD[(Int, Array[Array[Double]])]) = { + val n = rank * (rank + 1) / 2 + val LYtY = factors.values.aggregate(new DoubleMatrix(n))( seqOp = (L, Y) => { + Y.foreach(y => dspr(1.0, new DoubleMatrix(y), L)) + L + }, combOp = (L1, L2) => { + L1.addi(L2) + }) + val YtY = new DoubleMatrix(rank, rank) + fillFullMatrix(LYtY, YtY) + YtY } /** @@ -264,7 +294,7 @@ class ALS private ( /** * Flatten out blocked user or product factors into an RDD of (id, factor vector) pairs */ - def unblockFactors(blockedFactors: RDD[(Int, Array[Array[Double]])], + private def unblockFactors(blockedFactors: RDD[(Int, Array[Array[Double]])], outLinks: RDD[(Int, OutLinkBlock)]) = { blockedFactors.join(outLinks).flatMap{ case (b, (factors, outLinkBlock)) => for (i <- 0 until factors.length) yield (outLinkBlock.elementIds(i), factors(i)) @@ -332,8 +362,11 @@ class ALS private ( val outLinkBlock = makeOutLinkBlock(numBlocks, ratings) Iterator.single((blockId, (inLinkBlock, outLinkBlock))) }, true) - links.persist(StorageLevel.MEMORY_AND_DISK) - (links.mapValues(_._1), links.mapValues(_._2)) + val inLinks = links.mapValues(_._1) + val outLinks = links.mapValues(_._2) + inLinks.persist(StorageLevel.MEMORY_AND_DISK) + outLinks.persist(StorageLevel.MEMORY_AND_DISK) + (inLinks, outLinks) } /** @@ -365,7 +398,7 @@ class ALS private ( rank: Int, lambda: Double, alpha: Double, - YtY: Broadcast[Option[DoubleMatrix]]) + YtY: Option[Broadcast[DoubleMatrix]]) : RDD[(Int, Array[Array[Double]])] = { val numBlocks = products.partitions.size @@ -388,8 +421,8 @@ class ALS private ( * Compute the new feature vectors for a block of the users matrix given the list of factors * it received from each product and its InLinkBlock. */ - def updateBlock(messages: Seq[(Int, Array[Array[Double]])], inLinkBlock: InLinkBlock, - rank: Int, lambda: Double, alpha: Double, YtY: Broadcast[Option[DoubleMatrix]]) + private def updateBlock(messages: Seq[(Int, Array[Array[Double]])], inLinkBlock: InLinkBlock, + rank: Int, lambda: Double, alpha: Double, YtY: Option[Broadcast[DoubleMatrix]]) : Array[Array[Double]] = { // Sort the incoming block factor messages by block ID and make them an array @@ -416,21 +449,20 @@ class ALS private ( dspr(1.0, x, tempXtX) val (us, rs) = inLinkBlock.ratingsForBlock(productBlock)(p) for (i <- 0 until us.length) { - implicitPrefs match { - case false => - userXtX(us(i)).addi(tempXtX) - SimpleBlas.axpy(rs(i), x, userXy(us(i))) - case true => - // Extension to the original paper to handle rs(i) < 0. confidence is a function - // of |rs(i)| instead so that it is never negative: - val confidence = 1 + alpha * abs(rs(i)) - SimpleBlas.axpy(confidence - 1.0, tempXtX, userXtX(us(i))) - // For rs(i) < 0, the corresponding entry in P is 0 now, not 1 -- negative rs(i) - // means we try to reconstruct 0. We add terms only where P = 1, so, term below - // is now only added for rs(i) > 0: - if (rs(i) > 0) { - SimpleBlas.axpy(confidence, x, userXy(us(i))) - } + if (implicitPrefs) { + // Extension to the original paper to handle rs(i) < 0. confidence is a function + // of |rs(i)| instead so that it is never negative: + val confidence = 1 + alpha * abs(rs(i)) + SimpleBlas.axpy(confidence - 1.0, tempXtX, userXtX(us(i))) + // For rs(i) < 0, the corresponding entry in P is 0 now, not 1 -- negative rs(i) + // means we try to reconstruct 0. We add terms only where P = 1, so, term below + // is now only added for rs(i) > 0: + if (rs(i) > 0) { + SimpleBlas.axpy(confidence, x, userXy(us(i))) + } + } else { + userXtX(us(i)).addi(tempXtX) + SimpleBlas.axpy(rs(i), x, userXy(us(i))) } } } @@ -443,9 +475,10 @@ class ALS private ( // Add regularization (0 until rank).foreach(i => fullXtX.data(i*rank + i) += lambda) // Solve the resulting matrix, which is symmetric and positive-definite - implicitPrefs match { - case false => Solve.solvePositive(fullXtX, userXy(index)).data - case true => Solve.solvePositive(fullXtX.addi(YtY.value.get), userXy(index)).data + if (implicitPrefs) { + Solve.solvePositive(fullXtX.addi(YtY.get.value), userXy(index)).data + } else { + Solve.solvePositive(fullXtX, userXy(index)).data } } } |