From 0cc7b88c99405db99bc4c3d66f5409e5da0e3c6e Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Mon, 2 Feb 2015 23:49:09 -0800 Subject: [SPARK-5536] replace old ALS implementation by the new one The only issue is that `analyzeBlock` is removed, which was marked as a developer API. I didn't change other tests in the ALSSuite under `spark.mllib` to ensure that the implementation is correct. CC: srowen coderxiang Author: Xiangrui Meng Closes #4321 from mengxr/SPARK-5536 and squashes the following commits: 5a3cee8 [Xiangrui Meng] update python tests that are too strict e840acf [Xiangrui Meng] ignore scala style check for ALS.train e9a721c [Xiangrui Meng] update mima excludes 9ee6a36 [Xiangrui Meng] merge master 9a8aeac [Xiangrui Meng] update tests d8c3271 [Xiangrui Meng] remove analyzeBlocks d68eee7 [Xiangrui Meng] add checkpoint to new ALS 22a56f8 [Xiangrui Meng] wrap old ALS c387dff [Xiangrui Meng] support random seed 3bdf24b [Xiangrui Meng] make storage level configurable in the new ALS --- .../org/apache/spark/ml/recommendation/ALS.scala | 69 ++- .../apache/spark/mllib/recommendation/ALS.scala | 600 ++------------------- .../apache/spark/ml/recommendation/ALSSuite.scala | 2 +- .../spark/mllib/recommendation/ALSSuite.scala | 18 - 4 files changed, 76 insertions(+), 613 deletions(-) (limited to 'mllib') diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 82d21d5e4c..511cb2fe40 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -22,6 +22,7 @@ import java.{util => ju} import scala.collection.mutable import scala.reflect.ClassTag import scala.util.Sorting +import scala.util.hashing.byteswap64 import com.github.fommil.netlib.BLAS.{getInstance => blas} import com.github.fommil.netlib.LAPACK.{getInstance => lapack} @@ -37,6 +38,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Dsl._ import org.apache.spark.sql.types.{DoubleType, FloatType, IntegerType, StructField, StructType} +import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils import org.apache.spark.util.collection.{OpenHashMap, OpenHashSet, SortDataFormat, Sorter} import org.apache.spark.util.random.XORShiftRandom @@ -412,7 +414,7 @@ object ALS extends Logging { /** * Implementation of the ALS algorithm. */ - def train[ID: ClassTag]( + def train[ID: ClassTag]( // scalastyle:ignore ratings: RDD[Rating[ID]], rank: Int = 10, numUserBlocks: Int = 10, @@ -421,34 +423,47 @@ object ALS extends Logging { regParam: Double = 1.0, implicitPrefs: Boolean = false, alpha: Double = 1.0, - nonnegative: Boolean = false)( + nonnegative: Boolean = false, + intermediateRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK, + finalRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK, + seed: Long = 0L)( implicit ord: Ordering[ID]): (RDD[(ID, Array[Float])], RDD[(ID, Array[Float])]) = { + require(intermediateRDDStorageLevel != StorageLevel.NONE, + "ALS is not designed to run without persisting intermediate RDDs.") + val sc = ratings.sparkContext val userPart = new HashPartitioner(numUserBlocks) val itemPart = new HashPartitioner(numItemBlocks) val userLocalIndexEncoder = new LocalIndexEncoder(userPart.numPartitions) val itemLocalIndexEncoder = new LocalIndexEncoder(itemPart.numPartitions) val solver = if (nonnegative) new NNLSSolver else new CholeskySolver - val blockRatings = partitionRatings(ratings, userPart, itemPart).cache() - val (userInBlocks, userOutBlocks) = makeBlocks("user", blockRatings, userPart, itemPart) + val blockRatings = partitionRatings(ratings, userPart, itemPart) + .persist(intermediateRDDStorageLevel) + val (userInBlocks, userOutBlocks) = + makeBlocks("user", blockRatings, userPart, itemPart, intermediateRDDStorageLevel) // materialize blockRatings and user blocks userOutBlocks.count() val swappedBlockRatings = blockRatings.map { case ((userBlockId, itemBlockId), RatingBlock(userIds, itemIds, localRatings)) => ((itemBlockId, userBlockId), RatingBlock(itemIds, userIds, localRatings)) } - val (itemInBlocks, itemOutBlocks) = makeBlocks("item", swappedBlockRatings, itemPart, userPart) + val (itemInBlocks, itemOutBlocks) = + makeBlocks("item", swappedBlockRatings, itemPart, userPart, intermediateRDDStorageLevel) // materialize item blocks itemOutBlocks.count() - var userFactors = initialize(userInBlocks, rank) - var itemFactors = initialize(itemInBlocks, rank) + val seedGen = new XORShiftRandom(seed) + var userFactors = initialize(userInBlocks, rank, seedGen.nextLong()) + var itemFactors = initialize(itemInBlocks, rank, seedGen.nextLong()) if (implicitPrefs) { for (iter <- 1 to maxIter) { - userFactors.setName(s"userFactors-$iter").persist() + userFactors.setName(s"userFactors-$iter").persist(intermediateRDDStorageLevel) val previousItemFactors = itemFactors itemFactors = computeFactors(userFactors, userOutBlocks, itemInBlocks, rank, regParam, userLocalIndexEncoder, implicitPrefs, alpha, solver) previousItemFactors.unpersist() - itemFactors.setName(s"itemFactors-$iter").persist() + if (sc.checkpointDir.isDefined && (iter % 3 == 0)) { + itemFactors.checkpoint() + } + itemFactors.setName(s"itemFactors-$iter").persist(intermediateRDDStorageLevel) val previousUserFactors = userFactors userFactors = computeFactors(itemFactors, itemOutBlocks, userInBlocks, rank, regParam, itemLocalIndexEncoder, implicitPrefs, alpha, solver) @@ -467,21 +482,23 @@ object ALS extends Logging { .join(userFactors) .values .setName("userFactors") - .cache() - userIdAndFactors.count() - itemFactors.unpersist() + .persist(finalRDDStorageLevel) val itemIdAndFactors = itemInBlocks .mapValues(_.srcIds) .join(itemFactors) .values .setName("itemFactors") - .cache() - itemIdAndFactors.count() - userInBlocks.unpersist() - userOutBlocks.unpersist() - itemInBlocks.unpersist() - itemOutBlocks.unpersist() - blockRatings.unpersist() + .persist(finalRDDStorageLevel) + if (finalRDDStorageLevel != StorageLevel.NONE) { + userIdAndFactors.count() + itemFactors.unpersist() + itemIdAndFactors.count() + userInBlocks.unpersist() + userOutBlocks.unpersist() + itemInBlocks.unpersist() + itemOutBlocks.unpersist() + blockRatings.unpersist() + } val userOutput = userIdAndFactors.flatMap { case (ids, factors) => ids.view.zip(factors) } @@ -546,14 +563,15 @@ object ALS extends Logging { */ private def initialize[ID]( inBlocks: RDD[(Int, InBlock[ID])], - rank: Int): RDD[(Int, FactorBlock)] = { + rank: Int, + seed: Long): RDD[(Int, FactorBlock)] = { // Choose a unit vector uniformly at random from the unit sphere, but from the // "first quadrant" where all elements are nonnegative. This can be done by choosing // elements distributed as Normal(0,1) and taking the absolute value, and then normalizing. // This appears to create factorizations that have a slightly better reconstruction // (<1%) compared picking elements uniformly at random in [0,1]. inBlocks.map { case (srcBlockId, inBlock) => - val random = new XORShiftRandom(srcBlockId) + val random = new XORShiftRandom(byteswap64(seed ^ srcBlockId)) val factors = Array.fill(inBlock.srcIds.length) { val factor = Array.fill(rank)(random.nextGaussian().toFloat) val nrm = blas.snrm2(rank, factor, 1) @@ -877,7 +895,8 @@ object ALS extends Logging { prefix: String, ratingBlocks: RDD[((Int, Int), RatingBlock[ID])], srcPart: Partitioner, - dstPart: Partitioner)( + dstPart: Partitioner, + storageLevel: StorageLevel)( implicit srcOrd: Ordering[ID]): (RDD[(Int, InBlock[ID])], RDD[(Int, OutBlock)]) = { val inBlocks = ratingBlocks.map { case ((srcBlockId, dstBlockId), RatingBlock(srcIds, dstIds, ratings)) => @@ -914,7 +933,8 @@ object ALS extends Logging { builder.add(dstBlockId, srcIds, dstLocalIndices, ratings) } builder.build().compress() - }.setName(prefix + "InBlocks").cache() + }.setName(prefix + "InBlocks") + .persist(storageLevel) val outBlocks = inBlocks.mapValues { case InBlock(srcIds, dstPtrs, dstEncodedIndices, _) => val encoder = new LocalIndexEncoder(dstPart.numPartitions) val activeIds = Array.fill(dstPart.numPartitions)(mutable.ArrayBuilder.make[Int]) @@ -936,7 +956,8 @@ object ALS extends Logging { activeIds.map { x => x.result() } - }.setName(prefix + "OutBlocks").cache() + }.setName(prefix + "OutBlocks") + .persist(storageLevel) (inBlocks, outBlocks) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index a5ffe888ca..f4f51f2ac5 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -17,46 +17,12 @@ package org.apache.spark.mllib.recommendation -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer -import scala.math.{abs, sqrt} -import scala.util.{Random, Sorting} -import scala.util.hashing.byteswap32 - -import org.jblas.{DoubleMatrix, SimpleBlas, Solve} - -import org.apache.spark.{HashPartitioner, Logging, Partitioner} -import org.apache.spark.SparkContext._ +import org.apache.spark.Logging import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.JavaRDD -import org.apache.spark.broadcast.Broadcast -import org.apache.spark.mllib.optimization.NNLS +import org.apache.spark.ml.recommendation.{ALS => NewALS} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.Utils - -/** - * Out-link information for a user or product block. This includes the original user/product IDs - * of the elements within this block, and the list of destination blocks that each user or - * product will need to send its feature vector to. - */ -private[recommendation] -case class OutLinkBlock(elementIds: Array[Int], shouldSend: Array[mutable.BitSet]) - - -/** - * In-link information for a user (or product) block. This includes the original user/product IDs - * of the elements within this block, as well as an array of indices and ratings that specify - * which user in the block will be rated by which products from each product block (or vice-versa). - * Specifically, if this InLinkBlock is for users, ratingsForBlock(b)(i) will contain two arrays, - * indices and ratings, for the i'th product that will be sent to us by product block b (call this - * P). These arrays represent the users that product P had ratings for (by their index in this - * block), as well as the corresponding rating for each one. We can thus use this information when - * we get product block b's message to update the corresponding users. - */ -private[recommendation] case class InLinkBlock( - elementIds: Array[Int], ratingsForBlock: Array[Array[(Array[Int], Array[Double])]]) - /** * :: Experimental :: @@ -201,6 +167,8 @@ class ALS private ( */ @DeveloperApi def setIntermediateRDDStorageLevel(storageLevel: StorageLevel): this.type = { + require(storageLevel != StorageLevel.NONE, + "ALS is not designed to run without persisting intermediate RDDs.") this.intermediateRDDStorageLevel = storageLevel this } @@ -236,431 +204,39 @@ class ALS private ( this.numProductBlocks } - val userPartitioner = new ALSPartitioner(numUserBlocks) - val productPartitioner = new ALSPartitioner(numProductBlocks) - - val ratingsByUserBlock = ratings.map { rating => - (userPartitioner.getPartition(rating.user), rating) - } - val ratingsByProductBlock = ratings.map { rating => - (productPartitioner.getPartition(rating.product), - Rating(rating.product, rating.user, rating.rating)) - } - - val (userInLinks, userOutLinks) = - makeLinkRDDs(numUserBlocks, numProductBlocks, ratingsByUserBlock, productPartitioner) - val (productInLinks, productOutLinks) = - makeLinkRDDs(numProductBlocks, numUserBlocks, ratingsByProductBlock, userPartitioner) - userInLinks.setName("userInLinks") - userOutLinks.setName("userOutLinks") - productInLinks.setName("productInLinks") - productOutLinks.setName("productOutLinks") - - // Initialize user and product factors randomly, but use a deterministic seed for each - // partition so that fault recovery works - val seedGen = new Random(seed) - val seed1 = seedGen.nextInt() - val seed2 = seedGen.nextInt() - var users = userOutLinks.mapPartitionsWithIndex { (index, itr) => - val rand = new Random(byteswap32(seed1 ^ index)) - itr.map { case (x, y) => - (x, y.elementIds.map(_ => randomFactor(rank, rand))) - } - } - var products = productOutLinks.mapPartitionsWithIndex { (index, itr) => - val rand = new Random(byteswap32(seed2 ^ index)) - itr.map { case (x, y) => - (x, y.elementIds.map(_ => randomFactor(rank, rand))) - } - } - - if (implicitPrefs) { - for (iter <- 1 to iterations) { - // perform ALS update - logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations)) - // Persist users because it will be called twice. - users.setName(s"users-$iter").persist() - val YtY = Some(sc.broadcast(computeYtY(users))) - val previousProducts = products - products = updateFeatures(numProductBlocks, users, userOutLinks, productInLinks, - rank, lambda, alpha, YtY) - previousProducts.unpersist() - logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations)) - if (sc.checkpointDir.isDefined && (iter % 3 == 0)) { - products.checkpoint() - } - products.setName(s"products-$iter").persist() - val XtX = Some(sc.broadcast(computeYtY(products))) - val previousUsers = users - users = updateFeatures(numUserBlocks, products, productOutLinks, userInLinks, - rank, lambda, alpha, XtX) - previousUsers.unpersist() - } - } else { - for (iter <- 1 to iterations) { - // perform ALS update - logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations)) - products = updateFeatures(numProductBlocks, users, userOutLinks, productInLinks, - rank, lambda, alpha, YtY = None) - if (sc.checkpointDir.isDefined && (iter % 3 == 0)) { - products.checkpoint() - } - products.setName(s"products-$iter") - logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations)) - users = updateFeatures(numUserBlocks, products, productOutLinks, userInLinks, - rank, lambda, alpha, YtY = None) - users.setName(s"users-$iter") - } + val (floatUserFactors, floatProdFactors) = NewALS.train[Int]( + ratings = ratings.map(r => NewALS.Rating(r.user, r.product, r.rating.toFloat)), + rank = rank, + numUserBlocks = numUserBlocks, + numItemBlocks = numProductBlocks, + maxIter = iterations, + regParam = lambda, + implicitPrefs = implicitPrefs, + alpha = alpha, + nonnegative = nonnegative, + intermediateRDDStorageLevel = intermediateRDDStorageLevel, + finalRDDStorageLevel = StorageLevel.NONE, + seed = seed) + + val userFactors = floatUserFactors + .mapValues(_.map(_.toDouble)) + .setName("users") + .persist(finalRDDStorageLevel) + val prodFactors = floatProdFactors + .mapValues(_.map(_.toDouble)) + .setName("products") + .persist(finalRDDStorageLevel) + if (finalRDDStorageLevel != StorageLevel.NONE) { + userFactors.count() + prodFactors.count() } - - // The last `products` will be used twice. One to generate the last `users` and the other to - // generate `productsOut`. So we cache it for better performance. - products.setName("products").persist() - - // Flatten and cache the two final RDDs to un-block them - val usersOut = unblockFactors(users, userOutLinks) - val productsOut = unblockFactors(products, productOutLinks) - - usersOut.setName("usersOut").persist(finalRDDStorageLevel) - productsOut.setName("productsOut").persist(finalRDDStorageLevel) - - // Materialize usersOut and productsOut. - usersOut.count() - productsOut.count() - - products.unpersist() - - // Clean up. - userInLinks.unpersist() - userOutLinks.unpersist() - productInLinks.unpersist() - productOutLinks.unpersist() - - new MatrixFactorizationModel(rank, usersOut, productsOut) + new MatrixFactorizationModel(rank, userFactors, prodFactors) } /** * Java-friendly version of [[ALS.run]]. */ def run(ratings: JavaRDD[Rating]): MatrixFactorizationModel = run(ratings.rdd) - - /** - * Computes the (`rank x rank`) matrix `YtY`, where `Y` is the (`nui x rank`) matrix of factors - * for each user (or product), in a distributed fashion. - * - * @param factors the (block-distributed) user or product factor vectors - * @return YtY - whose value is only used in the implicit preference model - */ - private def computeYtY(factors: RDD[(Int, Array[Array[Double]])]) = { - val n = rank * (rank + 1) / 2 - val LYtY = factors.values.aggregate(new DoubleMatrix(n))( seqOp = (L, Y) => { - Y.foreach(y => dspr(1.0, wrapDoubleArray(y), L)) - L - }, combOp = (L1, L2) => { - L1.addi(L2) - }) - val YtY = new DoubleMatrix(rank, rank) - fillFullMatrix(LYtY, YtY) - YtY - } - - /** - * Adds alpha * x * x.t to a matrix in-place. This is the same as BLAS's DSPR. - * - * @param L the lower triangular part of the matrix packed in an array (row major) - */ - private def dspr(alpha: Double, x: DoubleMatrix, L: DoubleMatrix) = { - val n = x.length - var i = 0 - var j = 0 - var idx = 0 - var axi = 0.0 - val xd = x.data - val Ld = L.data - while (i < n) { - axi = alpha * xd(i) - j = 0 - while (j <= i) { - Ld(idx) += axi * xd(j) - j += 1 - idx += 1 - } - i += 1 - } - } - - /** - * Wrap a double array in a DoubleMatrix without creating garbage. - * This is a temporary fix for jblas 1.2.3; it should be safe to move back to the - * DoubleMatrix(double[]) constructor come jblas 1.2.4. - */ - private def wrapDoubleArray(v: Array[Double]): DoubleMatrix = { - new DoubleMatrix(v.length, 1, v: _*) - } - - /** - * Flatten out blocked user or product factors into an RDD of (id, factor vector) pairs - */ - private def unblockFactors( - blockedFactors: RDD[(Int, Array[Array[Double]])], - outLinks: RDD[(Int, OutLinkBlock)]): RDD[(Int, Array[Double])] = { - blockedFactors.join(outLinks).flatMap { case (b, (factors, outLinkBlock)) => - for (i <- 0 until factors.length) yield (outLinkBlock.elementIds(i), factors(i)) - } - } - - /** - * Make the out-links table for a block of the users (or products) dataset given the list of - * (user, product, rating) values for the users in that block (or the opposite for products). - */ - private def makeOutLinkBlock(numProductBlocks: Int, ratings: Array[Rating], - productPartitioner: Partitioner): OutLinkBlock = { - val userIds = ratings.map(_.user).distinct.sorted - val numUsers = userIds.length - val userIdToPos = userIds.zipWithIndex.toMap - val shouldSend = Array.fill(numUsers)(new mutable.BitSet(numProductBlocks)) - for (r <- ratings) { - shouldSend(userIdToPos(r.user))(productPartitioner.getPartition(r.product)) = true - } - OutLinkBlock(userIds, shouldSend) - } - - /** - * Make the in-links table for a block of the users (or products) dataset given a list of - * (user, product, rating) values for the users in that block (or the opposite for products). - */ - private def makeInLinkBlock(numProductBlocks: Int, ratings: Array[Rating], - productPartitioner: Partitioner): InLinkBlock = { - val userIds = ratings.map(_.user).distinct.sorted - val userIdToPos = userIds.zipWithIndex.toMap - // Split out our ratings by product block - val blockRatings = Array.fill(numProductBlocks)(new ArrayBuffer[Rating]) - for (r <- ratings) { - blockRatings(productPartitioner.getPartition(r.product)) += r - } - val ratingsForBlock = new Array[Array[(Array[Int], Array[Double])]](numProductBlocks) - for (productBlock <- 0 until numProductBlocks) { - // Create an array of (product, Seq(Rating)) ratings - val groupedRatings = blockRatings(productBlock).groupBy(_.product).toArray - // Sort them by product ID - val ordering = new Ordering[(Int, ArrayBuffer[Rating])] { - def compare(a: (Int, ArrayBuffer[Rating]), b: (Int, ArrayBuffer[Rating])): Int = - a._1 - b._1 - } - Sorting.quickSort(groupedRatings)(ordering) - // Translate the user IDs to indices based on userIdToPos - ratingsForBlock(productBlock) = groupedRatings.map { case (p, rs) => - (rs.view.map(r => userIdToPos(r.user)).toArray, rs.view.map(_.rating).toArray) - } - } - InLinkBlock(userIds, ratingsForBlock) - } - - /** - * Make RDDs of InLinkBlocks and OutLinkBlocks given an RDD of (blockId, (u, p, r)) values for - * the users (or (blockId, (p, u, r)) for the products). We create these simultaneously to avoid - * having to shuffle the (blockId, (u, p, r)) RDD twice, or to cache it. - */ - private def makeLinkRDDs( - numUserBlocks: Int, - numProductBlocks: Int, - ratingsByUserBlock: RDD[(Int, Rating)], - productPartitioner: Partitioner): (RDD[(Int, InLinkBlock)], RDD[(Int, OutLinkBlock)]) = { - val grouped = ratingsByUserBlock.partitionBy(new HashPartitioner(numUserBlocks)) - val links = grouped.mapPartitionsWithIndex((blockId, elements) => { - val ratings = elements.map(_._2).toArray - val inLinkBlock = makeInLinkBlock(numProductBlocks, ratings, productPartitioner) - val outLinkBlock = makeOutLinkBlock(numProductBlocks, ratings, productPartitioner) - Iterator.single((blockId, (inLinkBlock, outLinkBlock))) - }, preservesPartitioning = true) - val inLinks = links.mapValues(_._1) - val outLinks = links.mapValues(_._2) - inLinks.persist(intermediateRDDStorageLevel) - outLinks.persist(intermediateRDDStorageLevel) - (inLinks, outLinks) - } - - /** - * Make a random factor vector with the given random. - */ - private def randomFactor(rank: Int, rand: Random): Array[Double] = { - // Choose a unit vector uniformly at random from the unit sphere, but from the - // "first quadrant" where all elements are nonnegative. This can be done by choosing - // elements distributed as Normal(0,1) and taking the absolute value, and then normalizing. - // This appears to create factorizations that have a slightly better reconstruction - // (<1%) compared picking elements uniformly at random in [0,1]. - val factor = Array.fill(rank)(abs(rand.nextGaussian())) - val norm = sqrt(factor.map(x => x * x).sum) - factor.map(x => x / norm) - } - - /** - * Compute the user feature vectors given the current products (or vice-versa). This first joins - * the products with their out-links to generate a set of messages to each destination block - * (specifically, the features for the products that user block cares about), then groups these - * by destination and joins them with the in-link info to figure out how to update each user. - * It returns an RDD of new feature vectors for each user block. - */ - private def updateFeatures( - numUserBlocks: Int, - products: RDD[(Int, Array[Array[Double]])], - productOutLinks: RDD[(Int, OutLinkBlock)], - userInLinks: RDD[(Int, InLinkBlock)], - rank: Int, - lambda: Double, - alpha: Double, - YtY: Option[Broadcast[DoubleMatrix]]): RDD[(Int, Array[Array[Double]])] = { - productOutLinks.join(products).flatMap { case (bid, (outLinkBlock, factors)) => - val toSend = Array.fill(numUserBlocks)(new ArrayBuffer[Array[Double]]) - for (p <- 0 until outLinkBlock.elementIds.length; userBlock <- 0 until numUserBlocks) { - if (outLinkBlock.shouldSend(p)(userBlock)) { - toSend(userBlock) += factors(p) - } - } - toSend.zipWithIndex.map{ case (buf, idx) => (idx, (bid, buf.toArray)) } - }.groupByKey(new HashPartitioner(numUserBlocks)) - .join(userInLinks) - .mapValues{ case (messages, inLinkBlock) => - updateBlock(messages, inLinkBlock, rank, lambda, alpha, YtY) - } - } - - /** - * Compute the new feature vectors for a block of the users matrix given the list of factors - * it received from each product and its InLinkBlock. - */ - private def updateBlock(messages: Iterable[(Int, Array[Array[Double]])], inLinkBlock: InLinkBlock, - rank: Int, lambda: Double, alpha: Double, YtY: Option[Broadcast[DoubleMatrix]]) - : Array[Array[Double]] = - { - // Sort the incoming block factor messages by block ID and make them an array - val blockFactors = messages.toSeq.sortBy(_._1).map(_._2).toArray // Array[Array[Double]] - val numProductBlocks = blockFactors.length - val numUsers = inLinkBlock.elementIds.length - - // We'll sum up the XtXes using vectors that represent only the lower-triangular part, since - // the matrices are symmetric - val triangleSize = rank * (rank + 1) / 2 - val userXtX = Array.fill(numUsers)(DoubleMatrix.zeros(triangleSize)) - val userXy = Array.fill(numUsers)(DoubleMatrix.zeros(rank)) - - // Some temp variables to avoid memory allocation - val tempXtX = DoubleMatrix.zeros(triangleSize) - val fullXtX = DoubleMatrix.zeros(rank, rank) - - // Count the number of ratings each user gives to provide user-specific regularization - val numRatings = Array.fill(numUsers)(0) - - // Compute the XtX and Xy values for each user by adding products it rated in each product - // block - for (productBlock <- 0 until numProductBlocks) { - var p = 0 - while (p < blockFactors(productBlock).length) { - val x = wrapDoubleArray(blockFactors(productBlock)(p)) - tempXtX.fill(0.0) - dspr(1.0, x, tempXtX) - val (us, rs) = inLinkBlock.ratingsForBlock(productBlock)(p) - if (implicitPrefs) { - var i = 0 - while (i < us.length) { - numRatings(us(i)) += 1 - // Extension to the original paper to handle rs(i) < 0. confidence is a function - // of |rs(i)| instead so that it is never negative: - val confidence = 1 + alpha * abs(rs(i)) - SimpleBlas.axpy(confidence - 1.0, tempXtX, userXtX(us(i))) - // For rs(i) < 0, the corresponding entry in P is 0 now, not 1 -- negative rs(i) - // means we try to reconstruct 0. We add terms only where P = 1, so, term below - // is now only added for rs(i) > 0: - if (rs(i) > 0) { - SimpleBlas.axpy(confidence, x, userXy(us(i))) - } - i += 1 - } - } else { - var i = 0 - while (i < us.length) { - numRatings(us(i)) += 1 - userXtX(us(i)).addi(tempXtX) - SimpleBlas.axpy(rs(i), x, userXy(us(i))) - i += 1 - } - } - p += 1 - } - } - - val ws = if (nonnegative) NNLS.createWorkspace(rank) else null - - // Solve the least-squares problem for each user and return the new feature vectors - Array.range(0, numUsers).map { index => - // Compute the full XtX matrix from the lower-triangular part we got above - fillFullMatrix(userXtX(index), fullXtX) - // Add regularization - val regParam = numRatings(index) * lambda - var i = 0 - while (i < rank) { - fullXtX.data(i * rank + i) += regParam - i += 1 - } - // Solve the resulting matrix, which is symmetric and positive-definite - if (implicitPrefs) { - solveLeastSquares(fullXtX.addi(YtY.get.value), userXy(index), ws) - } else { - solveLeastSquares(fullXtX, userXy(index), ws) - } - } - } - - /** - * Given A^T A and A^T b, find the x minimising ||Ax - b||_2, possibly subject - * to nonnegativity constraints if `nonnegative` is true. - */ - private def solveLeastSquares(ata: DoubleMatrix, atb: DoubleMatrix, - ws: NNLS.Workspace): Array[Double] = { - if (!nonnegative) { - Solve.solvePositive(ata, atb).data - } else { - NNLS.solve(ata, atb, ws) - } - } - - /** - * Given a triangular matrix in the order of fillXtX above, compute the full symmetric square - * matrix that it represents, storing it into destMatrix. - */ - private def fillFullMatrix(triangularMatrix: DoubleMatrix, destMatrix: DoubleMatrix) { - val rank = destMatrix.rows - var i = 0 - var pos = 0 - while (i < rank) { - var j = 0 - while (j <= i) { - destMatrix.data(i*rank + j) = triangularMatrix.data(pos) - destMatrix.data(j*rank + i) = triangularMatrix.data(pos) - pos += 1 - j += 1 - } - i += 1 - } - } -} - -/** - * Partitioner for ALS. - */ -private[recommendation] class ALSPartitioner(override val numPartitions: Int) extends Partitioner { - override def getPartition(key: Any): Int = { - Utils.nonNegativeMod(byteswap32(key.asInstanceOf[Int]), numPartitions) - } - - override def equals(obj: Any): Boolean = { - obj match { - case p: ALSPartitioner => - this.numPartitions == p.numPartitions - case _ => - false - } - } } /** @@ -834,120 +410,4 @@ object ALS { : MatrixFactorizationModel = { trainImplicit(ratings, rank, iterations, 0.01, -1, 1.0) } - - /** - * :: DeveloperApi :: - * Statistics of a block in ALS computation. - * - * @param category type of this block, "user" or "product" - * @param index index of this block - * @param count number of users or products inside this block, the same as the number of - * least-squares problems to solve on this block in each iteration - * @param numRatings total number of ratings inside this block, the same as the number of outer - * products we need to make on this block in each iteration - * @param numInLinks total number of incoming links, the same as the number of vectors to retrieve - * before each iteration - * @param numOutLinks total number of outgoing links, the same as the number of vectors to send - * for the next iteration - */ - @DeveloperApi - case class BlockStats( - category: String, - index: Int, - count: Long, - numRatings: Long, - numInLinks: Long, - numOutLinks: Long) - - /** - * :: DeveloperApi :: - * Given an RDD of ratings, number of user blocks, and number of product blocks, computes the - * statistics of each block in ALS computation. This is useful for estimating cost and diagnosing - * load balance. - * - * @param ratings an RDD of ratings - * @param numUserBlocks number of user blocks - * @param numProductBlocks number of product blocks - * @return statistics of user blocks and product blocks - */ - @DeveloperApi - def analyzeBlocks( - ratings: RDD[Rating], - numUserBlocks: Int, - numProductBlocks: Int): Array[BlockStats] = { - - val userPartitioner = new ALSPartitioner(numUserBlocks) - val productPartitioner = new ALSPartitioner(numProductBlocks) - - val ratingsByUserBlock = ratings.map { rating => - (userPartitioner.getPartition(rating.user), rating) - } - val ratingsByProductBlock = ratings.map { rating => - (productPartitioner.getPartition(rating.product), - Rating(rating.product, rating.user, rating.rating)) - } - - val als = new ALS() - val (userIn, userOut) = - als.makeLinkRDDs(numUserBlocks, numProductBlocks, ratingsByUserBlock, userPartitioner) - val (prodIn, prodOut) = - als.makeLinkRDDs(numProductBlocks, numUserBlocks, ratingsByProductBlock, productPartitioner) - - def sendGrid(outLinks: RDD[(Int, OutLinkBlock)]): Map[(Int, Int), Long] = { - outLinks.map { x => - val grid = new mutable.HashMap[(Int, Int), Long]() - val uPartition = x._1 - x._2.shouldSend.foreach { ss => - ss.foreach { pPartition => - val pair = (uPartition, pPartition) - grid.put(pair, grid.getOrElse(pair, 0L) + 1L) - } - } - grid - }.reduce { (grid1, grid2) => - grid2.foreach { x => - grid1.put(x._1, grid1.getOrElse(x._1, 0L) + x._2) - } - grid1 - }.toMap - } - - val userSendGrid = sendGrid(userOut) - val prodSendGrid = sendGrid(prodOut) - - val userInbound = new Array[Long](numUserBlocks) - val prodInbound = new Array[Long](numProductBlocks) - val userOutbound = new Array[Long](numUserBlocks) - val prodOutbound = new Array[Long](numProductBlocks) - - for (u <- 0 until numUserBlocks; p <- 0 until numProductBlocks) { - userOutbound(u) += userSendGrid.getOrElse((u, p), 0L) - prodInbound(p) += userSendGrid.getOrElse((u, p), 0L) - userInbound(u) += prodSendGrid.getOrElse((p, u), 0L) - prodOutbound(p) += prodSendGrid.getOrElse((p, u), 0L) - } - - val userCounts = userOut.mapValues(x => x.elementIds.length).collectAsMap() - val prodCounts = prodOut.mapValues(x => x.elementIds.length).collectAsMap() - - val userRatings = countRatings(userIn) - val prodRatings = countRatings(prodIn) - - val userStats = Array.tabulate(numUserBlocks)( - u => BlockStats("user", u, userCounts(u), userRatings(u), userInbound(u), userOutbound(u))) - val productStatus = Array.tabulate(numProductBlocks)( - p => BlockStats("product", p, prodCounts(p), prodRatings(p), prodInbound(p), prodOutbound(p))) - - (userStats ++ productStatus).toArray - } - - private def countRatings(inLinks: RDD[(Int, InLinkBlock)]): Map[Int, Long] = { - inLinks.mapValues { ilb => - var numRatings = 0L - ilb.ratingsForBlock.foreach { ar => - ar.foreach { p => numRatings += p._1.length } - } - numRatings - }.collectAsMap().toMap - } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index ee08c3c327..acc447742b 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -414,7 +414,7 @@ class ALSSuite extends FunSuite with MLlibTestSparkContext with Logging { val (training, test) = genExplicitTestData(numUsers = 20, numItems = 40, rank = 2, noiseStd = 0.01) for ((numUserBlocks, numItemBlocks) <- Seq((1, 1), (1, 2), (2, 1), (2, 2))) { - testALS(training, test, maxIter = 4, rank = 2, regParam = 0.01, targetRMSE = 0.03, + testALS(training, test, maxIter = 4, rank = 3, regParam = 0.01, targetRMSE = 0.03, numUserBlocks = numUserBlocks, numItemBlocks = numItemBlocks) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala index e9fc37e000..8775c0ca9d 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala @@ -24,9 +24,7 @@ import scala.util.Random import org.scalatest.FunSuite import org.jblas.DoubleMatrix -import org.apache.spark.SparkContext._ import org.apache.spark.mllib.util.MLlibTestSparkContext -import org.apache.spark.mllib.recommendation.ALS.BlockStats import org.apache.spark.storage.StorageLevel object ALSSuite { @@ -189,22 +187,6 @@ class ALSSuite extends FunSuite with MLlibTestSparkContext { testALS(100, 200, 2, 15, 0.7, 0.4, false, false, false, -1, -1, false) } - test("analyze one user block and one product block") { - val localRatings = Seq( - Rating(0, 100, 1.0), - Rating(0, 101, 2.0), - Rating(0, 102, 3.0), - Rating(1, 102, 4.0), - Rating(2, 103, 5.0)) - val ratings = sc.makeRDD(localRatings, 2) - val stats = ALS.analyzeBlocks(ratings, 1, 1) - assert(stats.size === 2) - assert(stats(0) === BlockStats("user", 0, 3, 5, 4, 3)) - assert(stats(1) === BlockStats("product", 0, 4, 5, 3, 4)) - } - - // TODO: add tests for analyzing multiple user/product blocks - /** * Test if we can correctly factorize R = U * P where U and P are of known rank. * -- cgit v1.2.3