aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGuoQiang Li <witgo@qq.com>2014-07-30 11:00:11 -0700
committerXiangrui Meng <meng@databricks.com>2014-07-30 11:00:11 -0700
commitfc47bb6967e0df40870413e09d37aa9b90248f43 (patch)
treeb3cd4f7ff958ba8a25f66a84079c7ca333ba39d3
parente3d85b7e40073b05e2588583e9d8db11366c2f7b (diff)
downloadspark-fc47bb6967e0df40870413e09d37aa9b90248f43.tar.gz
spark-fc47bb6967e0df40870413e09d37aa9b90248f43.tar.bz2
spark-fc47bb6967e0df40870413e09d37aa9b90248f43.zip
[SPARK-2544][MLLIB] Improve ALS algorithm resource usage
Author: GuoQiang Li <witgo@qq.com> Author: witgo <witgo@qq.com> Closes #929 from witgo/improve_als and squashes the following commits: ea25033 [GuoQiang Li] checkpoint products 3,6,9 ... 154dccf [GuoQiang Li] checkpoint products only c5779ff [witgo] Improve ALS algorithm resource usage
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala6
1 files changed, 6 insertions, 0 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
index 5356790cb5..d208cfb917 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
@@ -255,6 +255,9 @@ class ALS private (
rank, lambda, alpha, YtY)
previousProducts.unpersist()
logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
+ if (sc.checkpointDir.isDefined && (iter % 3 == 0)) {
+ products.checkpoint()
+ }
products.setName(s"products-$iter").persist()
val XtX = Some(sc.broadcast(computeYtY(products)))
val previousUsers = users
@@ -268,6 +271,9 @@ class ALS private (
logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations))
products = updateFeatures(numProductBlocks, users, userOutLinks, productInLinks,
rank, lambda, alpha, YtY = None)
+ if (sc.checkpointDir.isDefined && (iter % 3 == 0)) {
+ products.checkpoint()
+ }
products.setName(s"products-$iter")
logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
users = updateFeatures(numUserBlocks, products, productOutLinks, userInLinks,