diff options
author | Neville Li <neville@spotify.com> | 2014-06-04 01:51:34 -0700 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2014-06-04 01:51:34 -0700 |
commit | b8d25800393d0208a76813bcd94509ac24a3add5 (patch) | |
tree | 815f03566aa7a8811f4f26c45227c0a7a51191f3 | |
parent | c402a4a685721d05932bbc578d997f330ff65a49 (diff) | |
download | spark-b8d25800393d0208a76813bcd94509ac24a3add5.tar.gz spark-b8d25800393d0208a76813bcd94509ac24a3add5.tar.bz2 spark-b8d25800393d0208a76813bcd94509ac24a3add5.zip |
[MLLIB] set RDD names in ALS
This is very useful when debugging & fine tuning jobs with large data sets.
Author: Neville Li <neville@spotify.com>
Closes #966 from nevillelyh/master and squashes the following commits:
6747764 [Neville Li] [MLLIB] use string interpolation for RDD names
3b15d34 [Neville Li] [MLLIB] set RDD names in ALS
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala | 16 |
1 files changed, 11 insertions, 5 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index cfc3b68606..d743bd7dd1 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -201,6 +201,10 @@ class ALS private ( val (userInLinks, userOutLinks) = makeLinkRDDs(numBlocks, ratingsByUserBlock, partitioner) val (productInLinks, productOutLinks) = makeLinkRDDs(numBlocks, ratingsByProductBlock, partitioner) + userInLinks.setName("userInLinks") + userOutLinks.setName("userOutLinks") + productInLinks.setName("productInLinks") + productOutLinks.setName("productOutLinks") // Initialize user and product factors randomly, but use a deterministic seed for each // partition so that fault recovery works @@ -225,14 +229,14 @@ class ALS private ( // perform ALS update logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations)) // Persist users because it will be called twice. - users.persist() + users.setName(s"users-$iter").persist() val YtY = Some(sc.broadcast(computeYtY(users))) val previousProducts = products products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda, alpha, YtY) previousProducts.unpersist() logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations)) - products.persist() + products.setName(s"products-$iter").persist() val XtX = Some(sc.broadcast(computeYtY(products))) val previousUsers = users users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda, @@ -245,22 +249,24 @@ class ALS private ( logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations)) products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda, alpha, YtY = None) + products.setName(s"products-$iter") logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations)) users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda, alpha, YtY = None) + users.setName(s"users-$iter") } } // The last `products` will be used twice. One to generate the last `users` and the other to // generate `productsOut`. So we cache it for better performance. - products.persist() + products.setName("products").persist() // Flatten and cache the two final RDDs to un-block them val usersOut = unblockFactors(users, userOutLinks) val productsOut = unblockFactors(products, productOutLinks) - usersOut.persist() - productsOut.persist() + usersOut.setName("usersOut").persist() + productsOut.setName("productsOut").persist() // Materialize usersOut and productsOut. usersOut.count() |