aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorXiangrui Meng <meng@databricks.com>2014-03-18 17:20:42 -0700
committerMatei Zaharia <matei@databricks.com>2014-03-18 17:20:42 -0700
commitf9d8a83c0006bb59c61e8770cd201b72333cb9a4 (patch)
tree31690aca1930996c7e9955311928228fb14541e5 /mllib
parente108b9ab94c4310ec56ef0eda99bb904133f942d (diff)
downloadspark-f9d8a83c0006bb59c61e8770cd201b72333cb9a4.tar.gz
spark-f9d8a83c0006bb59c61e8770cd201b72333cb9a4.tar.bz2
spark-f9d8a83c0006bb59c61e8770cd201b72333cb9a4.zip
[SPARK-1266] persist factors in implicit ALS
In implicit ALS computation, the user or product factor is used twice in each iteration. Caching can certainly help accelerate the computation. I saw the running time decreased by ~70% for implicit ALS on the movielens data. I also made the following changes: 1. Change `YtYb` type from `Broadcast[Option[DoubleMatrix]]` to `Option[Broadcast[DoubleMatrix]]`, so we don't need to broadcast None in explicit computation. 2. Mark methods `computeYtY`, `unblockFactors`, `updateBlock`, and `updateFeatures private`. Users do not need those methods. 3. Materialize the final matrix factors before returning the model. It allows us to clean up other cached RDDs before returning the model. I do not have a better solution here, so I use `RDD.count()`. JIRA: https://spark-project.atlassian.net/browse/SPARK-1266 Author: Xiangrui Meng <meng@databricks.com> Closes #165 from mengxr/als and squashes the following commits: c9676a6 [Xiangrui Meng] add a comment about the last products.persist d3a88aa [Xiangrui Meng] change implicitPrefs match to if ... else ... 63862d6 [Xiangrui Meng] persist factors in implicit ALS
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala145
1 files changed, 89 insertions, 56 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
index 777d0db2d6..0cc9f48769 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
@@ -148,8 +148,10 @@ class ALS private (
* Returns a MatrixFactorizationModel with feature vectors for each user and product.
*/
def run(ratings: RDD[Rating]): MatrixFactorizationModel = {
+ val sc = ratings.context
+
val numBlocks = if (this.numBlocks == -1) {
- math.max(ratings.context.defaultParallelism, ratings.partitions.size / 2)
+ math.max(sc.defaultParallelism, ratings.partitions.size / 2)
} else {
this.numBlocks
}
@@ -187,21 +189,41 @@ class ALS private (
}
}
- for (iter <- 1 to iterations) {
- // perform ALS update
- logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations))
- // YtY / XtX is an Option[DoubleMatrix] and is only required for the implicit feedback model
- val YtY = computeYtY(users)
- val YtYb = ratings.context.broadcast(YtY)
- products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda,
- alpha, YtYb)
- logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
- val XtX = computeYtY(products)
- val XtXb = ratings.context.broadcast(XtX)
- users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda,
- alpha, XtXb)
+ if (implicitPrefs) {
+ for (iter <- 1 to iterations) {
+ // perform ALS update
+ logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations))
+ // Persist users because it will be called twice.
+ users.persist()
+ val YtY = Some(sc.broadcast(computeYtY(users)))
+ val previousProducts = products
+ products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda,
+ alpha, YtY)
+ previousProducts.unpersist()
+ logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
+ products.persist()
+ val XtX = Some(sc.broadcast(computeYtY(products)))
+ val previousUsers = users
+ users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda,
+ alpha, XtX)
+ previousUsers.unpersist()
+ }
+ } else {
+ for (iter <- 1 to iterations) {
+ // perform ALS update
+ logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations))
+ products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda,
+ alpha, YtY = None)
+ logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
+ users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda,
+ alpha, YtY = None)
+ }
}
+ // The last `products` will be used twice. One to generate the last `users` and the other to
+ // generate `productsOut`. So we cache it for better performance.
+ products.persist()
+
// Flatten and cache the two final RDDs to un-block them
val usersOut = unblockFactors(users, userOutLinks)
val productsOut = unblockFactors(products, productOutLinks)
@@ -209,31 +231,39 @@ class ALS private (
usersOut.persist()
productsOut.persist()
+ // Materialize usersOut and productsOut.
+ usersOut.count()
+ productsOut.count()
+
+ products.unpersist()
+
+ // Clean up.
+ userInLinks.unpersist()
+ userOutLinks.unpersist()
+ productInLinks.unpersist()
+ productOutLinks.unpersist()
+
new MatrixFactorizationModel(rank, usersOut, productsOut)
}
/**
* Computes the (`rank x rank`) matrix `YtY`, where `Y` is the (`nui x rank`) matrix of factors
- * for each user (or product), in a distributed fashion. Here `reduceByKeyLocally` is used as
- * the driver program requires `YtY` to broadcast it to the slaves
+ * for each user (or product), in a distributed fashion.
+ *
* @param factors the (block-distributed) user or product factor vectors
- * @return Option[YtY] - whose value is only used in the implicit preference model
+ * @return YtY - whose value is only used in the implicit preference model
*/
- def computeYtY(factors: RDD[(Int, Array[Array[Double]])]) = {
- if (implicitPrefs) {
- val n = rank * (rank + 1) / 2
- val LYtY = factors.values.aggregate(new DoubleMatrix(n))( seqOp = (L, Y) => {
- Y.foreach(y => dspr(1.0, new DoubleMatrix(y), L))
- L
- }, combOp = (L1, L2) => {
- L1.addi(L2)
- })
- val YtY = new DoubleMatrix(rank, rank)
- fillFullMatrix(LYtY, YtY)
- Option(YtY)
- } else {
- None
- }
+ private def computeYtY(factors: RDD[(Int, Array[Array[Double]])]) = {
+ val n = rank * (rank + 1) / 2
+ val LYtY = factors.values.aggregate(new DoubleMatrix(n))( seqOp = (L, Y) => {
+ Y.foreach(y => dspr(1.0, new DoubleMatrix(y), L))
+ L
+ }, combOp = (L1, L2) => {
+ L1.addi(L2)
+ })
+ val YtY = new DoubleMatrix(rank, rank)
+ fillFullMatrix(LYtY, YtY)
+ YtY
}
/**
@@ -264,7 +294,7 @@ class ALS private (
/**
* Flatten out blocked user or product factors into an RDD of (id, factor vector) pairs
*/
- def unblockFactors(blockedFactors: RDD[(Int, Array[Array[Double]])],
+ private def unblockFactors(blockedFactors: RDD[(Int, Array[Array[Double]])],
outLinks: RDD[(Int, OutLinkBlock)]) = {
blockedFactors.join(outLinks).flatMap{ case (b, (factors, outLinkBlock)) =>
for (i <- 0 until factors.length) yield (outLinkBlock.elementIds(i), factors(i))
@@ -332,8 +362,11 @@ class ALS private (
val outLinkBlock = makeOutLinkBlock(numBlocks, ratings)
Iterator.single((blockId, (inLinkBlock, outLinkBlock)))
}, true)
- links.persist(StorageLevel.MEMORY_AND_DISK)
- (links.mapValues(_._1), links.mapValues(_._2))
+ val inLinks = links.mapValues(_._1)
+ val outLinks = links.mapValues(_._2)
+ inLinks.persist(StorageLevel.MEMORY_AND_DISK)
+ outLinks.persist(StorageLevel.MEMORY_AND_DISK)
+ (inLinks, outLinks)
}
/**
@@ -365,7 +398,7 @@ class ALS private (
rank: Int,
lambda: Double,
alpha: Double,
- YtY: Broadcast[Option[DoubleMatrix]])
+ YtY: Option[Broadcast[DoubleMatrix]])
: RDD[(Int, Array[Array[Double]])] =
{
val numBlocks = products.partitions.size
@@ -388,8 +421,8 @@ class ALS private (
* Compute the new feature vectors for a block of the users matrix given the list of factors
* it received from each product and its InLinkBlock.
*/
- def updateBlock(messages: Seq[(Int, Array[Array[Double]])], inLinkBlock: InLinkBlock,
- rank: Int, lambda: Double, alpha: Double, YtY: Broadcast[Option[DoubleMatrix]])
+ private def updateBlock(messages: Seq[(Int, Array[Array[Double]])], inLinkBlock: InLinkBlock,
+ rank: Int, lambda: Double, alpha: Double, YtY: Option[Broadcast[DoubleMatrix]])
: Array[Array[Double]] =
{
// Sort the incoming block factor messages by block ID and make them an array
@@ -416,21 +449,20 @@ class ALS private (
dspr(1.0, x, tempXtX)
val (us, rs) = inLinkBlock.ratingsForBlock(productBlock)(p)
for (i <- 0 until us.length) {
- implicitPrefs match {
- case false =>
- userXtX(us(i)).addi(tempXtX)
- SimpleBlas.axpy(rs(i), x, userXy(us(i)))
- case true =>
- // Extension to the original paper to handle rs(i) < 0. confidence is a function
- // of |rs(i)| instead so that it is never negative:
- val confidence = 1 + alpha * abs(rs(i))
- SimpleBlas.axpy(confidence - 1.0, tempXtX, userXtX(us(i)))
- // For rs(i) < 0, the corresponding entry in P is 0 now, not 1 -- negative rs(i)
- // means we try to reconstruct 0. We add terms only where P = 1, so, term below
- // is now only added for rs(i) > 0:
- if (rs(i) > 0) {
- SimpleBlas.axpy(confidence, x, userXy(us(i)))
- }
+ if (implicitPrefs) {
+ // Extension to the original paper to handle rs(i) < 0. confidence is a function
+ // of |rs(i)| instead so that it is never negative:
+ val confidence = 1 + alpha * abs(rs(i))
+ SimpleBlas.axpy(confidence - 1.0, tempXtX, userXtX(us(i)))
+ // For rs(i) < 0, the corresponding entry in P is 0 now, not 1 -- negative rs(i)
+ // means we try to reconstruct 0. We add terms only where P = 1, so, term below
+ // is now only added for rs(i) > 0:
+ if (rs(i) > 0) {
+ SimpleBlas.axpy(confidence, x, userXy(us(i)))
+ }
+ } else {
+ userXtX(us(i)).addi(tempXtX)
+ SimpleBlas.axpy(rs(i), x, userXy(us(i)))
}
}
}
@@ -443,9 +475,10 @@ class ALS private (
// Add regularization
(0 until rank).foreach(i => fullXtX.data(i*rank + i) += lambda)
// Solve the resulting matrix, which is symmetric and positive-definite
- implicitPrefs match {
- case false => Solve.solvePositive(fullXtX, userXy(index)).data
- case true => Solve.solvePositive(fullXtX.addi(YtY.value.get), userXy(index)).data
+ if (implicitPrefs) {
+ Solve.solvePositive(fullXtX.addi(YtY.get.value), userXy(index)).data
+ } else {
+ Solve.solvePositive(fullXtX, userXy(index)).data
}
}
}