aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorFernando Otero (ZeoS) <fotero@gmail.com>2015-01-08 12:42:54 -0800
committerXiangrui Meng <meng@databricks.com>2015-01-08 12:42:54 -0800
commit72df5a301e706d9384f3a1c17b2c58b017632b1f (patch)
treedccede8a63578a7dbc9cf1d81be8c3a2b45948cf /mllib
parent538f221627930c8f8a138c0d21d9fa09bc789e67 (diff)
downloadspark-72df5a301e706d9384f3a1c17b2c58b017632b1f.tar.gz
spark-72df5a301e706d9384f3a1c17b2c58b017632b1f.tar.bz2
spark-72df5a301e706d9384f3a1c17b2c58b017632b1f.zip
SPARK-5148 [MLlib] Make usersOut/productsOut storagelevel in ALS configurable
Author: Fernando Otero (ZeoS) <fotero@gmail.com> Closes #3953 from zeitos/storageLevel and squashes the following commits: 0f070b9 [Fernando Otero (ZeoS)] fix imports 6869e80 [Fernando Otero (ZeoS)] fix comment length 90c9f7e [Fernando Otero (ZeoS)] fix comment length 18a992e [Fernando Otero (ZeoS)] changing storage level
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala18
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala27
2 files changed, 43 insertions, 2 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
index 90ac252226..bee951a2e5 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
@@ -116,6 +116,7 @@ class ALS private (
/** storage level for user/product in/out links */
private var intermediateRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK
+ private var finalRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK
/**
* Set the number of blocks for both user blocks and product blocks to parallelize the computation
@@ -205,6 +206,19 @@ class ALS private (
}
/**
+ * :: DeveloperApi ::
+ * Sets storage level for final RDDs (user/product used in MatrixFactorizationModel). The default
+ * value is `MEMORY_AND_DISK`. Users can change it to a serialized storage, e.g.
+ * `MEMORY_AND_DISK_SER` and set `spark.rdd.compress` to `true` to reduce the space requirement,
+ * at the cost of speed.
+ */
+ @DeveloperApi
+ def setFinalRDDStorageLevel(storageLevel: StorageLevel): this.type = {
+ this.finalRDDStorageLevel = storageLevel
+ this
+ }
+
+ /**
* Run ALS with the configured parameters on an input RDD of (user, product, rating) triples.
* Returns a MatrixFactorizationModel with feature vectors for each user and product.
*/
@@ -307,8 +321,8 @@ class ALS private (
val usersOut = unblockFactors(users, userOutLinks)
val productsOut = unblockFactors(products, productOutLinks)
- usersOut.setName("usersOut").persist(StorageLevel.MEMORY_AND_DISK)
- productsOut.setName("productsOut").persist(StorageLevel.MEMORY_AND_DISK)
+ usersOut.setName("usersOut").persist(finalRDDStorageLevel)
+ productsOut.setName("productsOut").persist(finalRDDStorageLevel)
// Materialize usersOut and productsOut.
usersOut.count()
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
index 603d0ad127..f3b7bfda78 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
@@ -27,6 +27,7 @@ import org.jblas.DoubleMatrix
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.mllib.recommendation.ALS.BlockStats
+import org.apache.spark.storage.StorageLevel
object ALSSuite {
@@ -139,6 +140,32 @@ class ALSSuite extends FunSuite with MLlibTestSparkContext {
assert(u11 != u2)
}
+ test("Storage Level for RDDs in model") {
+ val ratings = sc.parallelize(ALSSuite.generateRatings(10, 20, 5, 0.5, false, false)._1, 2)
+ var storageLevel = StorageLevel.MEMORY_ONLY
+ var model = new ALS()
+ .setRank(5)
+ .setIterations(1)
+ .setLambda(1.0)
+ .setBlocks(2)
+ .setSeed(1)
+ .setFinalRDDStorageLevel(storageLevel)
+ .run(ratings)
+ assert(model.productFeatures.getStorageLevel == storageLevel);
+ assert(model.userFeatures.getStorageLevel == storageLevel);
+ storageLevel = StorageLevel.DISK_ONLY
+ model = new ALS()
+ .setRank(5)
+ .setIterations(1)
+ .setLambda(1.0)
+ .setBlocks(2)
+ .setSeed(1)
+ .setFinalRDDStorageLevel(storageLevel)
+ .run(ratings)
+ assert(model.productFeatures.getStorageLevel == storageLevel);
+ assert(model.userFeatures.getStorageLevel == storageLevel);
+ }
+
test("negative ids") {
val data = ALSSuite.generateRatings(50, 50, 2, 0.7, false, false)
val ratings = sc.parallelize(data._1.map { case Rating(u, p, r) =>