From fc47bb6967e0df40870413e09d37aa9b90248f43 Mon Sep 17 00:00:00 2001 From: GuoQiang Li Date: Wed, 30 Jul 2014 11:00:11 -0700 Subject: [SPARK-2544][MLLIB] Improve ALS algorithm resource usage Author: GuoQiang Li Author: witgo Closes #929 from witgo/improve_als and squashes the following commits: ea25033 [GuoQiang Li] checkpoint products 3,6,9 ... 154dccf [GuoQiang Li] checkpoint products only c5779ff [witgo] Improve ALS algorithm resource usage --- .../src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 5356790cb5..d208cfb917 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -255,6 +255,9 @@ class ALS private ( rank, lambda, alpha, YtY) previousProducts.unpersist() logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations)) + if (sc.checkpointDir.isDefined && (iter % 3 == 0)) { + products.checkpoint() + } products.setName(s"products-$iter").persist() val XtX = Some(sc.broadcast(computeYtY(products))) val previousUsers = users @@ -268,6 +271,9 @@ class ALS private ( logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations)) products = updateFeatures(numProductBlocks, users, userOutLinks, productInLinks, rank, lambda, alpha, YtY = None) + if (sc.checkpointDir.isDefined && (iter % 3 == 0)) { + products.checkpoint() + } products.setName(s"products-$iter") logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations)) users = updateFeatures(numUserBlocks, products, productOutLinks, userInLinks, -- cgit v1.2.3