aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorNeville Li <neville@spotify.com>2014-06-04 01:51:34 -0700
committerXiangrui Meng <meng@databricks.com>2014-06-04 01:51:34 -0700
commitb8d25800393d0208a76813bcd94509ac24a3add5 (patch)
tree815f03566aa7a8811f4f26c45227c0a7a51191f3 /mllib
parentc402a4a685721d05932bbc578d997f330ff65a49 (diff)
downloadspark-b8d25800393d0208a76813bcd94509ac24a3add5.tar.gz
spark-b8d25800393d0208a76813bcd94509ac24a3add5.tar.bz2
spark-b8d25800393d0208a76813bcd94509ac24a3add5.zip
[MLLIB] set RDD names in ALS
This is very useful when debugging & fine tuning jobs with large data sets. Author: Neville Li <neville@spotify.com> Closes #966 from nevillelyh/master and squashes the following commits: 6747764 [Neville Li] [MLLIB] use string interpolation for RDD names 3b15d34 [Neville Li] [MLLIB] set RDD names in ALS
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala16
1 files changed, 11 insertions, 5 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
index cfc3b68606..d743bd7dd1 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
@@ -201,6 +201,10 @@ class ALS private (
val (userInLinks, userOutLinks) = makeLinkRDDs(numBlocks, ratingsByUserBlock, partitioner)
val (productInLinks, productOutLinks) =
makeLinkRDDs(numBlocks, ratingsByProductBlock, partitioner)
+ userInLinks.setName("userInLinks")
+ userOutLinks.setName("userOutLinks")
+ productInLinks.setName("productInLinks")
+ productOutLinks.setName("productOutLinks")
// Initialize user and product factors randomly, but use a deterministic seed for each
// partition so that fault recovery works
@@ -225,14 +229,14 @@ class ALS private (
// perform ALS update
logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations))
// Persist users because it will be called twice.
- users.persist()
+ users.setName(s"users-$iter").persist()
val YtY = Some(sc.broadcast(computeYtY(users)))
val previousProducts = products
products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda,
alpha, YtY)
previousProducts.unpersist()
logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
- products.persist()
+ products.setName(s"products-$iter").persist()
val XtX = Some(sc.broadcast(computeYtY(products)))
val previousUsers = users
users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda,
@@ -245,22 +249,24 @@ class ALS private (
logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations))
products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda,
alpha, YtY = None)
+ products.setName(s"products-$iter")
logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda,
alpha, YtY = None)
+ users.setName(s"users-$iter")
}
}
// The last `products` will be used twice. One to generate the last `users` and the other to
// generate `productsOut`. So we cache it for better performance.
- products.persist()
+ products.setName("products").persist()
// Flatten and cache the two final RDDs to un-block them
val usersOut = unblockFactors(users, userOutLinks)
val productsOut = unblockFactors(products, productOutLinks)
- usersOut.persist()
- productsOut.persist()
+ usersOut.setName("usersOut").persist()
+ productsOut.setName("productsOut").persist()
// Materialize usersOut and productsOut.
usersOut.count()