aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala12
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala164
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala60
-rw-r--r--project/MimaExcludes.scala8
4 files changed, 160 insertions, 84 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala
index 6eb41e7ba3..28e201d279 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala
@@ -50,6 +50,8 @@ object MovieLensALS {
numIterations: Int = 20,
lambda: Double = 1.0,
rank: Int = 10,
+ numUserBlocks: Int = -1,
+ numProductBlocks: Int = -1,
implicitPrefs: Boolean = false)
def main(args: Array[String]) {
@@ -67,8 +69,14 @@ object MovieLensALS {
.text(s"lambda (smoothing constant), default: ${defaultParams.lambda}")
.action((x, c) => c.copy(lambda = x))
opt[Unit]("kryo")
- .text(s"use Kryo serialization")
+ .text("use Kryo serialization")
.action((_, c) => c.copy(kryo = true))
+ opt[Int]("numUserBlocks")
+ .text(s"number of user blocks, default: ${defaultParams.numUserBlocks} (auto)")
+ .action((x, c) => c.copy(numUserBlocks = x))
+ opt[Int]("numProductBlocks")
+ .text(s"number of product blocks, default: ${defaultParams.numProductBlocks} (auto)")
+ .action((x, c) => c.copy(numProductBlocks = x))
opt[Unit]("implicitPrefs")
.text("use implicit preference")
.action((_, c) => c.copy(implicitPrefs = true))
@@ -160,6 +168,8 @@ object MovieLensALS {
.setIterations(params.numIterations)
.setLambda(params.lambda)
.setImplicitPrefs(params.implicitPrefs)
+ .setUserBlocks(params.numUserBlocks)
+ .setProductBlocks(params.numProductBlocks)
.run(training)
val rmse = computeRmse(model, test, params.implicitPrefs)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
index d743bd7dd1..f1ae7b85b4 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
@@ -61,7 +61,7 @@ private[recommendation] case class InLinkBlock(
* A more compact class to represent a rating than Tuple3[Int, Int, Double].
*/
@Experimental
-case class Rating(val user: Int, val product: Int, val rating: Double)
+case class Rating(user: Int, product: Int, rating: Double)
/**
* Alternating Least Squares matrix factorization.
@@ -93,7 +93,8 @@ case class Rating(val user: Int, val product: Int, val rating: Double)
* preferences rather than explicit ratings given to items.
*/
class ALS private (
- private var numBlocks: Int,
+ private var numUserBlocks: Int,
+ private var numProductBlocks: Int,
private var rank: Int,
private var iterations: Int,
private var lambda: Double,
@@ -106,14 +107,31 @@ class ALS private (
* Constructs an ALS instance with default parameters: {numBlocks: -1, rank: 10, iterations: 10,
* lambda: 0.01, implicitPrefs: false, alpha: 1.0}.
*/
- def this() = this(-1, 10, 10, 0.01, false, 1.0)
+ def this() = this(-1, -1, 10, 10, 0.01, false, 1.0)
/**
- * Set the number of blocks to parallelize the computation into; pass -1 for an auto-configured
- * number of blocks. Default: -1.
+ * Set the number of blocks for both user blocks and product blocks to parallelize the computation
+ * into; pass -1 for an auto-configured number of blocks. Default: -1.
*/
def setBlocks(numBlocks: Int): ALS = {
- this.numBlocks = numBlocks
+ this.numUserBlocks = numBlocks
+ this.numProductBlocks = numBlocks
+ this
+ }
+
+ /**
+ * Set the number of user blocks to parallelize the computation.
+ */
+ def setUserBlocks(numUserBlocks: Int): ALS = {
+ this.numUserBlocks = numUserBlocks
+ this
+ }
+
+ /**
+ * Set the number of product blocks to parallelize the computation.
+ */
+ def setProductBlocks(numProductBlocks: Int): ALS = {
+ this.numProductBlocks = numProductBlocks
this
}
@@ -176,31 +194,32 @@ class ALS private (
def run(ratings: RDD[Rating]): MatrixFactorizationModel = {
val sc = ratings.context
- val numBlocks = if (this.numBlocks == -1) {
+ val numUserBlocks = if (this.numUserBlocks == -1) {
math.max(sc.defaultParallelism, ratings.partitions.size / 2)
} else {
- this.numBlocks
+ this.numUserBlocks
}
-
- val partitioner = new Partitioner {
- val numPartitions = numBlocks
-
- def getPartition(x: Any): Int = {
- Utils.nonNegativeMod(byteswap32(x.asInstanceOf[Int]), numPartitions)
- }
+ val numProductBlocks = if (this.numProductBlocks == -1) {
+ math.max(sc.defaultParallelism, ratings.partitions.size / 2)
+ } else {
+ this.numProductBlocks
}
- val ratingsByUserBlock = ratings.map{ rating =>
- (partitioner.getPartition(rating.user), rating)
+ val userPartitioner = new ALSPartitioner(numUserBlocks)
+ val productPartitioner = new ALSPartitioner(numProductBlocks)
+
+ val ratingsByUserBlock = ratings.map { rating =>
+ (userPartitioner.getPartition(rating.user), rating)
}
- val ratingsByProductBlock = ratings.map{ rating =>
- (partitioner.getPartition(rating.product),
+ val ratingsByProductBlock = ratings.map { rating =>
+ (productPartitioner.getPartition(rating.product),
Rating(rating.product, rating.user, rating.rating))
}
- val (userInLinks, userOutLinks) = makeLinkRDDs(numBlocks, ratingsByUserBlock, partitioner)
+ val (userInLinks, userOutLinks) =
+ makeLinkRDDs(numUserBlocks, numProductBlocks, ratingsByUserBlock, productPartitioner)
val (productInLinks, productOutLinks) =
- makeLinkRDDs(numBlocks, ratingsByProductBlock, partitioner)
+ makeLinkRDDs(numProductBlocks, numUserBlocks, ratingsByProductBlock, userPartitioner)
userInLinks.setName("userInLinks")
userOutLinks.setName("userOutLinks")
productInLinks.setName("productInLinks")
@@ -232,27 +251,27 @@ class ALS private (
users.setName(s"users-$iter").persist()
val YtY = Some(sc.broadcast(computeYtY(users)))
val previousProducts = products
- products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda,
- alpha, YtY)
+ products = updateFeatures(numProductBlocks, users, userOutLinks, productInLinks,
+ userPartitioner, rank, lambda, alpha, YtY)
previousProducts.unpersist()
logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
products.setName(s"products-$iter").persist()
val XtX = Some(sc.broadcast(computeYtY(products)))
val previousUsers = users
- users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda,
- alpha, XtX)
+ users = updateFeatures(numUserBlocks, products, productOutLinks, userInLinks,
+ productPartitioner, rank, lambda, alpha, XtX)
previousUsers.unpersist()
}
} else {
for (iter <- 1 to iterations) {
// perform ALS update
logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations))
- products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda,
- alpha, YtY = None)
+ products = updateFeatures(numProductBlocks, users, userOutLinks, productInLinks,
+ userPartitioner, rank, lambda, alpha, YtY = None)
products.setName(s"products-$iter")
logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
- users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda,
- alpha, YtY = None)
+ users = updateFeatures(numUserBlocks, products, productOutLinks, userInLinks,
+ productPartitioner, rank, lambda, alpha, YtY = None)
users.setName(s"users-$iter")
}
}
@@ -340,9 +359,10 @@ class ALS private (
/**
* Flatten out blocked user or product factors into an RDD of (id, factor vector) pairs
*/
- private def unblockFactors(blockedFactors: RDD[(Int, Array[Array[Double]])],
- outLinks: RDD[(Int, OutLinkBlock)]) = {
- blockedFactors.join(outLinks).flatMap{ case (b, (factors, outLinkBlock)) =>
+ private def unblockFactors(
+ blockedFactors: RDD[(Int, Array[Array[Double]])],
+ outLinks: RDD[(Int, OutLinkBlock)]): RDD[(Int, Array[Double])] = {
+ blockedFactors.join(outLinks).flatMap { case (b, (factors, outLinkBlock)) =>
for (i <- 0 until factors.length) yield (outLinkBlock.elementIds(i), factors(i))
}
}
@@ -351,14 +371,14 @@ class ALS private (
* Make the out-links table for a block of the users (or products) dataset given the list of
* (user, product, rating) values for the users in that block (or the opposite for products).
*/
- private def makeOutLinkBlock(numBlocks: Int, ratings: Array[Rating],
- partitioner: Partitioner): OutLinkBlock = {
+ private def makeOutLinkBlock(numProductBlocks: Int, ratings: Array[Rating],
+ productPartitioner: Partitioner): OutLinkBlock = {
val userIds = ratings.map(_.user).distinct.sorted
val numUsers = userIds.length
val userIdToPos = userIds.zipWithIndex.toMap
- val shouldSend = Array.fill(numUsers)(new BitSet(numBlocks))
+ val shouldSend = Array.fill(numUsers)(new BitSet(numProductBlocks))
for (r <- ratings) {
- shouldSend(userIdToPos(r.user))(partitioner.getPartition(r.product)) = true
+ shouldSend(userIdToPos(r.user))(productPartitioner.getPartition(r.product)) = true
}
OutLinkBlock(userIds, shouldSend)
}
@@ -367,18 +387,17 @@ class ALS private (
* Make the in-links table for a block of the users (or products) dataset given a list of
* (user, product, rating) values for the users in that block (or the opposite for products).
*/
- private def makeInLinkBlock(numBlocks: Int, ratings: Array[Rating],
- partitioner: Partitioner): InLinkBlock = {
+ private def makeInLinkBlock(numProductBlocks: Int, ratings: Array[Rating],
+ productPartitioner: Partitioner): InLinkBlock = {
val userIds = ratings.map(_.user).distinct.sorted
- val numUsers = userIds.length
val userIdToPos = userIds.zipWithIndex.toMap
// Split out our ratings by product block
- val blockRatings = Array.fill(numBlocks)(new ArrayBuffer[Rating])
+ val blockRatings = Array.fill(numProductBlocks)(new ArrayBuffer[Rating])
for (r <- ratings) {
- blockRatings(partitioner.getPartition(r.product)) += r
+ blockRatings(productPartitioner.getPartition(r.product)) += r
}
- val ratingsForBlock = new Array[Array[(Array[Int], Array[Double])]](numBlocks)
- for (productBlock <- 0 until numBlocks) {
+ val ratingsForBlock = new Array[Array[(Array[Int], Array[Double])]](numProductBlocks)
+ for (productBlock <- 0 until numProductBlocks) {
// Create an array of (product, Seq(Rating)) ratings
val groupedRatings = blockRatings(productBlock).groupBy(_.product).toArray
// Sort them by product ID
@@ -400,14 +419,16 @@ class ALS private (
* the users (or (blockId, (p, u, r)) for the products). We create these simultaneously to avoid
* having to shuffle the (blockId, (u, p, r)) RDD twice, or to cache it.
*/
- private def makeLinkRDDs(numBlocks: Int, ratings: RDD[(Int, Rating)], partitioner: Partitioner)
- : (RDD[(Int, InLinkBlock)], RDD[(Int, OutLinkBlock)]) =
- {
- val grouped = ratings.partitionBy(new HashPartitioner(numBlocks))
+ private def makeLinkRDDs(
+ numUserBlocks: Int,
+ numProductBlocks: Int,
+ ratingsByUserBlock: RDD[(Int, Rating)],
+ productPartitioner: Partitioner): (RDD[(Int, InLinkBlock)], RDD[(Int, OutLinkBlock)]) = {
+ val grouped = ratingsByUserBlock.partitionBy(new HashPartitioner(numUserBlocks))
val links = grouped.mapPartitionsWithIndex((blockId, elements) => {
- val ratings = elements.map{_._2}.toArray
- val inLinkBlock = makeInLinkBlock(numBlocks, ratings, partitioner)
- val outLinkBlock = makeOutLinkBlock(numBlocks, ratings, partitioner)
+ val ratings = elements.map(_._2).toArray
+ val inLinkBlock = makeInLinkBlock(numProductBlocks, ratings, productPartitioner)
+ val outLinkBlock = makeOutLinkBlock(numProductBlocks, ratings, productPartitioner)
Iterator.single((blockId, (inLinkBlock, outLinkBlock)))
}, true)
val inLinks = links.mapValues(_._1)
@@ -439,26 +460,24 @@ class ALS private (
* It returns an RDD of new feature vectors for each user block.
*/
private def updateFeatures(
+ numUserBlocks: Int,
products: RDD[(Int, Array[Array[Double]])],
productOutLinks: RDD[(Int, OutLinkBlock)],
userInLinks: RDD[(Int, InLinkBlock)],
- partitioner: Partitioner,
+ productPartitioner: Partitioner,
rank: Int,
lambda: Double,
alpha: Double,
- YtY: Option[Broadcast[DoubleMatrix]])
- : RDD[(Int, Array[Array[Double]])] =
- {
- val numBlocks = products.partitions.size
+ YtY: Option[Broadcast[DoubleMatrix]]): RDD[(Int, Array[Array[Double]])] = {
productOutLinks.join(products).flatMap { case (bid, (outLinkBlock, factors)) =>
- val toSend = Array.fill(numBlocks)(new ArrayBuffer[Array[Double]])
- for (p <- 0 until outLinkBlock.elementIds.length; userBlock <- 0 until numBlocks) {
+ val toSend = Array.fill(numUserBlocks)(new ArrayBuffer[Array[Double]])
+ for (p <- 0 until outLinkBlock.elementIds.length; userBlock <- 0 until numUserBlocks) {
if (outLinkBlock.shouldSend(p)(userBlock)) {
toSend(userBlock) += factors(p)
}
}
toSend.zipWithIndex.map{ case (buf, idx) => (idx, (bid, buf.toArray)) }
- }.groupByKey(partitioner)
+ }.groupByKey(productPartitioner)
.join(userInLinks)
.mapValues{ case (messages, inLinkBlock) =>
updateBlock(messages, inLinkBlock, rank, lambda, alpha, YtY)
@@ -475,7 +494,7 @@ class ALS private (
{
// Sort the incoming block factor messages by block ID and make them an array
val blockFactors = messages.toSeq.sortBy(_._1).map(_._2).toArray // Array[Array[Double]]
- val numBlocks = blockFactors.length
+ val numProductBlocks = blockFactors.length
val numUsers = inLinkBlock.elementIds.length
// We'll sum up the XtXes using vectors that represent only the lower-triangular part, since
@@ -490,7 +509,7 @@ class ALS private (
// Compute the XtX and Xy values for each user by adding products it rated in each product
// block
- for (productBlock <- 0 until numBlocks) {
+ for (productBlock <- 0 until numProductBlocks) {
var p = 0
while (p < blockFactors(productBlock).length) {
val x = wrapDoubleArray(blockFactors(productBlock)(p))
@@ -579,6 +598,23 @@ class ALS private (
}
}
+/**
+ * Partitioner for ALS.
+ */
+private[recommendation] class ALSPartitioner(override val numPartitions: Int) extends Partitioner {
+ override def getPartition(key: Any): Int = {
+ Utils.nonNegativeMod(byteswap32(key.asInstanceOf[Int]), numPartitions)
+ }
+
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case p: ALSPartitioner =>
+ this.numPartitions == p.numPartitions
+ case _ =>
+ false
+ }
+ }
+}
/**
* Top-level methods for calling Alternating Least Squares (ALS) matrix factorization.
@@ -606,7 +642,7 @@ object ALS {
blocks: Int,
seed: Long
): MatrixFactorizationModel = {
- new ALS(blocks, rank, iterations, lambda, false, 1.0, seed).run(ratings)
+ new ALS(blocks, blocks, rank, iterations, lambda, false, 1.0, seed).run(ratings)
}
/**
@@ -629,7 +665,7 @@ object ALS {
lambda: Double,
blocks: Int
): MatrixFactorizationModel = {
- new ALS(blocks, rank, iterations, lambda, false, 1.0).run(ratings)
+ new ALS(blocks, blocks, rank, iterations, lambda, false, 1.0).run(ratings)
}
/**
@@ -689,7 +725,7 @@ object ALS {
alpha: Double,
seed: Long
): MatrixFactorizationModel = {
- new ALS(blocks, rank, iterations, lambda, true, alpha, seed).run(ratings)
+ new ALS(blocks, blocks, rank, iterations, lambda, true, alpha, seed).run(ratings)
}
/**
@@ -714,7 +750,7 @@ object ALS {
blocks: Int,
alpha: Double
): MatrixFactorizationModel = {
- new ALS(blocks, rank, iterations, lambda, true, alpha).run(ratings)
+ new ALS(blocks, blocks, rank, iterations, lambda, true, alpha).run(ratings)
}
/**
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
index 37c9b9d085..81bebec8c7 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
@@ -121,6 +121,10 @@ class ALSSuite extends FunSuite with LocalSparkContext {
testALS(100, 200, 2, 15, 0.7, 0.4, true, false, true)
}
+ test("rank-2 matrices with different user and product blocks") {
+ testALS(100, 200, 2, 15, 0.7, 0.4, numUserBlocks = 4, numProductBlocks = 2)
+ }
+
test("pseudorandomness") {
val ratings = sc.parallelize(ALSSuite.generateRatings(10, 20, 5, 0.5, false, false)._1, 2)
val model11 = ALS.train(ratings, 5, 1, 1.0, 2, 1)
@@ -153,35 +157,52 @@ class ALSSuite extends FunSuite with LocalSparkContext {
}
test("NNALS, rank 2") {
- testALS(100, 200, 2, 15, 0.7, 0.4, false, false, false, -1, false)
+ testALS(100, 200, 2, 15, 0.7, 0.4, false, false, false, -1, -1, false)
}
/**
* Test if we can correctly factorize R = U * P where U and P are of known rank.
*
- * @param users number of users
- * @param products number of products
- * @param features number of features (rank of problem)
- * @param iterations number of iterations to run
- * @param samplingRate what fraction of the user-product pairs are known
+ * @param users number of users
+ * @param products number of products
+ * @param features number of features (rank of problem)
+ * @param iterations number of iterations to run
+ * @param samplingRate what fraction of the user-product pairs are known
* @param matchThreshold max difference allowed to consider a predicted rating correct
- * @param implicitPrefs flag to test implicit feedback
- * @param bulkPredict flag to test bulk prediciton
+ * @param implicitPrefs flag to test implicit feedback
+ * @param bulkPredict flag to test bulk prediciton
* @param negativeWeights whether the generated data can contain negative values
- * @param numBlocks number of blocks to partition users and products into
+ * @param numUserBlocks number of user blocks to partition users into
+ * @param numProductBlocks number of product blocks to partition products into
* @param negativeFactors whether the generated user/product factors can have negative entries
*/
- def testALS(users: Int, products: Int, features: Int, iterations: Int,
- samplingRate: Double, matchThreshold: Double, implicitPrefs: Boolean = false,
- bulkPredict: Boolean = false, negativeWeights: Boolean = false, numBlocks: Int = -1,
- negativeFactors: Boolean = true)
- {
+ def testALS(
+ users: Int,
+ products: Int,
+ features: Int,
+ iterations: Int,
+ samplingRate: Double,
+ matchThreshold: Double,
+ implicitPrefs: Boolean = false,
+ bulkPredict: Boolean = false,
+ negativeWeights: Boolean = false,
+ numUserBlocks: Int = -1,
+ numProductBlocks: Int = -1,
+ negativeFactors: Boolean = true) {
val (sampledRatings, trueRatings, truePrefs) = ALSSuite.generateRatings(users, products,
features, samplingRate, implicitPrefs, negativeWeights, negativeFactors)
- val model = (new ALS().setBlocks(numBlocks).setRank(features).setIterations(iterations)
- .setAlpha(1.0).setImplicitPrefs(implicitPrefs).setLambda(0.01).setSeed(0L)
- .setNonnegative(!negativeFactors).run(sc.parallelize(sampledRatings)))
+ val model = new ALS()
+ .setUserBlocks(numUserBlocks)
+ .setProductBlocks(numProductBlocks)
+ .setRank(features)
+ .setIterations(iterations)
+ .setAlpha(1.0)
+ .setImplicitPrefs(implicitPrefs)
+ .setLambda(0.01)
+ .setSeed(0L)
+ .setNonnegative(!negativeFactors)
+ .run(sc.parallelize(sampledRatings))
val predictedU = new DoubleMatrix(users, features)
for ((u, vec) <- model.userFeatures.collect(); i <- 0 until features) {
@@ -208,8 +229,9 @@ class ALSSuite extends FunSuite with LocalSparkContext {
val prediction = predictedRatings.get(u, p)
val correct = trueRatings.get(u, p)
if (math.abs(prediction - correct) > matchThreshold) {
- fail("Model failed to predict (%d, %d): %f vs %f\ncorr: %s\npred: %s\nU: %s\n P: %s".format(
- u, p, correct, prediction, trueRatings, predictedRatings, predictedU, predictedP))
+ fail(("Model failed to predict (%d, %d): %f vs %f\ncorr: %s\npred: %s\nU: %s\n P: %s")
+ .format(u, p, correct, prediction, trueRatings, predictedRatings, predictedU,
+ predictedP))
}
}
} else {
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index dd7efceb23..c80ab9a9f8 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -54,6 +54,14 @@ object MimaExcludes {
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.api.java.JavaDoubleRDD.countApproxDistinct$default$1")
) ++
+ Seq( // Ignore some private methods in ALS.
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$^dateFeatures"),
+ ProblemFilters.exclude[MissingMethodProblem]( // The only public constructor is the one without arguments.
+ "org.apache.spark.mllib.recommendation.ALS.this"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$<init>$default$7")
+ ) ++
MimaBuild.excludeSparkClass("rdd.ZippedRDD") ++
MimaBuild.excludeSparkClass("rdd.ZippedPartition") ++
MimaBuild.excludeSparkClass("util.SerializableHyperLogLog")