aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorXiangrui Meng <meng@databricks.com>2015-02-02 23:49:09 -0800
committerXiangrui Meng <meng@databricks.com>2015-02-02 23:49:09 -0800
commit0cc7b88c99405db99bc4c3d66f5409e5da0e3c6e (patch)
tree954be2e5603e919a268ea9d68262586b2eb77ed4 /mllib
parentb8ebebeaaa259be4fcddf65b3280d23165b011a1 (diff)
downloadspark-0cc7b88c99405db99bc4c3d66f5409e5da0e3c6e.tar.gz
spark-0cc7b88c99405db99bc4c3d66f5409e5da0e3c6e.tar.bz2
spark-0cc7b88c99405db99bc4c3d66f5409e5da0e3c6e.zip
[SPARK-5536] replace old ALS implementation by the new one
The only issue is that `analyzeBlock` is removed, which was marked as a developer API. I didn't change other tests in the ALSSuite under `spark.mllib` to ensure that the implementation is correct. CC: srowen coderxiang Author: Xiangrui Meng <meng@databricks.com> Closes #4321 from mengxr/SPARK-5536 and squashes the following commits: 5a3cee8 [Xiangrui Meng] update python tests that are too strict e840acf [Xiangrui Meng] ignore scala style check for ALS.train e9a721c [Xiangrui Meng] update mima excludes 9ee6a36 [Xiangrui Meng] merge master 9a8aeac [Xiangrui Meng] update tests d8c3271 [Xiangrui Meng] remove analyzeBlocks d68eee7 [Xiangrui Meng] add checkpoint to new ALS 22a56f8 [Xiangrui Meng] wrap old ALS c387dff [Xiangrui Meng] support random seed 3bdf24b [Xiangrui Meng] make storage level configurable in the new ALS
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala69
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala600
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala2
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala18
4 files changed, 76 insertions, 613 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
index 82d21d5e4c..511cb2fe40 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
@@ -22,6 +22,7 @@ import java.{util => ju}
import scala.collection.mutable
import scala.reflect.ClassTag
import scala.util.Sorting
+import scala.util.hashing.byteswap64
import com.github.fommil.netlib.BLAS.{getInstance => blas}
import com.github.fommil.netlib.LAPACK.{getInstance => lapack}
@@ -37,6 +38,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.types.{DoubleType, FloatType, IntegerType, StructField, StructType}
+import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
import org.apache.spark.util.collection.{OpenHashMap, OpenHashSet, SortDataFormat, Sorter}
import org.apache.spark.util.random.XORShiftRandom
@@ -412,7 +414,7 @@ object ALS extends Logging {
/**
* Implementation of the ALS algorithm.
*/
- def train[ID: ClassTag](
+ def train[ID: ClassTag]( // scalastyle:ignore
ratings: RDD[Rating[ID]],
rank: Int = 10,
numUserBlocks: Int = 10,
@@ -421,34 +423,47 @@ object ALS extends Logging {
regParam: Double = 1.0,
implicitPrefs: Boolean = false,
alpha: Double = 1.0,
- nonnegative: Boolean = false)(
+ nonnegative: Boolean = false,
+ intermediateRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK,
+ finalRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK,
+ seed: Long = 0L)(
implicit ord: Ordering[ID]): (RDD[(ID, Array[Float])], RDD[(ID, Array[Float])]) = {
+ require(intermediateRDDStorageLevel != StorageLevel.NONE,
+ "ALS is not designed to run without persisting intermediate RDDs.")
+ val sc = ratings.sparkContext
val userPart = new HashPartitioner(numUserBlocks)
val itemPart = new HashPartitioner(numItemBlocks)
val userLocalIndexEncoder = new LocalIndexEncoder(userPart.numPartitions)
val itemLocalIndexEncoder = new LocalIndexEncoder(itemPart.numPartitions)
val solver = if (nonnegative) new NNLSSolver else new CholeskySolver
- val blockRatings = partitionRatings(ratings, userPart, itemPart).cache()
- val (userInBlocks, userOutBlocks) = makeBlocks("user", blockRatings, userPart, itemPart)
+ val blockRatings = partitionRatings(ratings, userPart, itemPart)
+ .persist(intermediateRDDStorageLevel)
+ val (userInBlocks, userOutBlocks) =
+ makeBlocks("user", blockRatings, userPart, itemPart, intermediateRDDStorageLevel)
// materialize blockRatings and user blocks
userOutBlocks.count()
val swappedBlockRatings = blockRatings.map {
case ((userBlockId, itemBlockId), RatingBlock(userIds, itemIds, localRatings)) =>
((itemBlockId, userBlockId), RatingBlock(itemIds, userIds, localRatings))
}
- val (itemInBlocks, itemOutBlocks) = makeBlocks("item", swappedBlockRatings, itemPart, userPart)
+ val (itemInBlocks, itemOutBlocks) =
+ makeBlocks("item", swappedBlockRatings, itemPart, userPart, intermediateRDDStorageLevel)
// materialize item blocks
itemOutBlocks.count()
- var userFactors = initialize(userInBlocks, rank)
- var itemFactors = initialize(itemInBlocks, rank)
+ val seedGen = new XORShiftRandom(seed)
+ var userFactors = initialize(userInBlocks, rank, seedGen.nextLong())
+ var itemFactors = initialize(itemInBlocks, rank, seedGen.nextLong())
if (implicitPrefs) {
for (iter <- 1 to maxIter) {
- userFactors.setName(s"userFactors-$iter").persist()
+ userFactors.setName(s"userFactors-$iter").persist(intermediateRDDStorageLevel)
val previousItemFactors = itemFactors
itemFactors = computeFactors(userFactors, userOutBlocks, itemInBlocks, rank, regParam,
userLocalIndexEncoder, implicitPrefs, alpha, solver)
previousItemFactors.unpersist()
- itemFactors.setName(s"itemFactors-$iter").persist()
+ if (sc.checkpointDir.isDefined && (iter % 3 == 0)) {
+ itemFactors.checkpoint()
+ }
+ itemFactors.setName(s"itemFactors-$iter").persist(intermediateRDDStorageLevel)
val previousUserFactors = userFactors
userFactors = computeFactors(itemFactors, itemOutBlocks, userInBlocks, rank, regParam,
itemLocalIndexEncoder, implicitPrefs, alpha, solver)
@@ -467,21 +482,23 @@ object ALS extends Logging {
.join(userFactors)
.values
.setName("userFactors")
- .cache()
- userIdAndFactors.count()
- itemFactors.unpersist()
+ .persist(finalRDDStorageLevel)
val itemIdAndFactors = itemInBlocks
.mapValues(_.srcIds)
.join(itemFactors)
.values
.setName("itemFactors")
- .cache()
- itemIdAndFactors.count()
- userInBlocks.unpersist()
- userOutBlocks.unpersist()
- itemInBlocks.unpersist()
- itemOutBlocks.unpersist()
- blockRatings.unpersist()
+ .persist(finalRDDStorageLevel)
+ if (finalRDDStorageLevel != StorageLevel.NONE) {
+ userIdAndFactors.count()
+ itemFactors.unpersist()
+ itemIdAndFactors.count()
+ userInBlocks.unpersist()
+ userOutBlocks.unpersist()
+ itemInBlocks.unpersist()
+ itemOutBlocks.unpersist()
+ blockRatings.unpersist()
+ }
val userOutput = userIdAndFactors.flatMap { case (ids, factors) =>
ids.view.zip(factors)
}
@@ -546,14 +563,15 @@ object ALS extends Logging {
*/
private def initialize[ID](
inBlocks: RDD[(Int, InBlock[ID])],
- rank: Int): RDD[(Int, FactorBlock)] = {
+ rank: Int,
+ seed: Long): RDD[(Int, FactorBlock)] = {
// Choose a unit vector uniformly at random from the unit sphere, but from the
// "first quadrant" where all elements are nonnegative. This can be done by choosing
// elements distributed as Normal(0,1) and taking the absolute value, and then normalizing.
// This appears to create factorizations that have a slightly better reconstruction
// (<1%) compared picking elements uniformly at random in [0,1].
inBlocks.map { case (srcBlockId, inBlock) =>
- val random = new XORShiftRandom(srcBlockId)
+ val random = new XORShiftRandom(byteswap64(seed ^ srcBlockId))
val factors = Array.fill(inBlock.srcIds.length) {
val factor = Array.fill(rank)(random.nextGaussian().toFloat)
val nrm = blas.snrm2(rank, factor, 1)
@@ -877,7 +895,8 @@ object ALS extends Logging {
prefix: String,
ratingBlocks: RDD[((Int, Int), RatingBlock[ID])],
srcPart: Partitioner,
- dstPart: Partitioner)(
+ dstPart: Partitioner,
+ storageLevel: StorageLevel)(
implicit srcOrd: Ordering[ID]): (RDD[(Int, InBlock[ID])], RDD[(Int, OutBlock)]) = {
val inBlocks = ratingBlocks.map {
case ((srcBlockId, dstBlockId), RatingBlock(srcIds, dstIds, ratings)) =>
@@ -914,7 +933,8 @@ object ALS extends Logging {
builder.add(dstBlockId, srcIds, dstLocalIndices, ratings)
}
builder.build().compress()
- }.setName(prefix + "InBlocks").cache()
+ }.setName(prefix + "InBlocks")
+ .persist(storageLevel)
val outBlocks = inBlocks.mapValues { case InBlock(srcIds, dstPtrs, dstEncodedIndices, _) =>
val encoder = new LocalIndexEncoder(dstPart.numPartitions)
val activeIds = Array.fill(dstPart.numPartitions)(mutable.ArrayBuilder.make[Int])
@@ -936,7 +956,8 @@ object ALS extends Logging {
activeIds.map { x =>
x.result()
}
- }.setName(prefix + "OutBlocks").cache()
+ }.setName(prefix + "OutBlocks")
+ .persist(storageLevel)
(inBlocks, outBlocks)
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
index a5ffe888ca..f4f51f2ac5 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
@@ -17,46 +17,12 @@
package org.apache.spark.mllib.recommendation
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-import scala.math.{abs, sqrt}
-import scala.util.{Random, Sorting}
-import scala.util.hashing.byteswap32
-
-import org.jblas.{DoubleMatrix, SimpleBlas, Solve}
-
-import org.apache.spark.{HashPartitioner, Logging, Partitioner}
-import org.apache.spark.SparkContext._
+import org.apache.spark.Logging
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.JavaRDD
-import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.mllib.optimization.NNLS
+import org.apache.spark.ml.recommendation.{ALS => NewALS}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.Utils
-
-/**
- * Out-link information for a user or product block. This includes the original user/product IDs
- * of the elements within this block, and the list of destination blocks that each user or
- * product will need to send its feature vector to.
- */
-private[recommendation]
-case class OutLinkBlock(elementIds: Array[Int], shouldSend: Array[mutable.BitSet])
-
-
-/**
- * In-link information for a user (or product) block. This includes the original user/product IDs
- * of the elements within this block, as well as an array of indices and ratings that specify
- * which user in the block will be rated by which products from each product block (or vice-versa).
- * Specifically, if this InLinkBlock is for users, ratingsForBlock(b)(i) will contain two arrays,
- * indices and ratings, for the i'th product that will be sent to us by product block b (call this
- * P). These arrays represent the users that product P had ratings for (by their index in this
- * block), as well as the corresponding rating for each one. We can thus use this information when
- * we get product block b's message to update the corresponding users.
- */
-private[recommendation] case class InLinkBlock(
- elementIds: Array[Int], ratingsForBlock: Array[Array[(Array[Int], Array[Double])]])
-
/**
* :: Experimental ::
@@ -201,6 +167,8 @@ class ALS private (
*/
@DeveloperApi
def setIntermediateRDDStorageLevel(storageLevel: StorageLevel): this.type = {
+ require(storageLevel != StorageLevel.NONE,
+ "ALS is not designed to run without persisting intermediate RDDs.")
this.intermediateRDDStorageLevel = storageLevel
this
}
@@ -236,431 +204,39 @@ class ALS private (
this.numProductBlocks
}
- val userPartitioner = new ALSPartitioner(numUserBlocks)
- val productPartitioner = new ALSPartitioner(numProductBlocks)
-
- val ratingsByUserBlock = ratings.map { rating =>
- (userPartitioner.getPartition(rating.user), rating)
- }
- val ratingsByProductBlock = ratings.map { rating =>
- (productPartitioner.getPartition(rating.product),
- Rating(rating.product, rating.user, rating.rating))
- }
-
- val (userInLinks, userOutLinks) =
- makeLinkRDDs(numUserBlocks, numProductBlocks, ratingsByUserBlock, productPartitioner)
- val (productInLinks, productOutLinks) =
- makeLinkRDDs(numProductBlocks, numUserBlocks, ratingsByProductBlock, userPartitioner)
- userInLinks.setName("userInLinks")
- userOutLinks.setName("userOutLinks")
- productInLinks.setName("productInLinks")
- productOutLinks.setName("productOutLinks")
-
- // Initialize user and product factors randomly, but use a deterministic seed for each
- // partition so that fault recovery works
- val seedGen = new Random(seed)
- val seed1 = seedGen.nextInt()
- val seed2 = seedGen.nextInt()
- var users = userOutLinks.mapPartitionsWithIndex { (index, itr) =>
- val rand = new Random(byteswap32(seed1 ^ index))
- itr.map { case (x, y) =>
- (x, y.elementIds.map(_ => randomFactor(rank, rand)))
- }
- }
- var products = productOutLinks.mapPartitionsWithIndex { (index, itr) =>
- val rand = new Random(byteswap32(seed2 ^ index))
- itr.map { case (x, y) =>
- (x, y.elementIds.map(_ => randomFactor(rank, rand)))
- }
- }
-
- if (implicitPrefs) {
- for (iter <- 1 to iterations) {
- // perform ALS update
- logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations))
- // Persist users because it will be called twice.
- users.setName(s"users-$iter").persist()
- val YtY = Some(sc.broadcast(computeYtY(users)))
- val previousProducts = products
- products = updateFeatures(numProductBlocks, users, userOutLinks, productInLinks,
- rank, lambda, alpha, YtY)
- previousProducts.unpersist()
- logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
- if (sc.checkpointDir.isDefined && (iter % 3 == 0)) {
- products.checkpoint()
- }
- products.setName(s"products-$iter").persist()
- val XtX = Some(sc.broadcast(computeYtY(products)))
- val previousUsers = users
- users = updateFeatures(numUserBlocks, products, productOutLinks, userInLinks,
- rank, lambda, alpha, XtX)
- previousUsers.unpersist()
- }
- } else {
- for (iter <- 1 to iterations) {
- // perform ALS update
- logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations))
- products = updateFeatures(numProductBlocks, users, userOutLinks, productInLinks,
- rank, lambda, alpha, YtY = None)
- if (sc.checkpointDir.isDefined && (iter % 3 == 0)) {
- products.checkpoint()
- }
- products.setName(s"products-$iter")
- logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
- users = updateFeatures(numUserBlocks, products, productOutLinks, userInLinks,
- rank, lambda, alpha, YtY = None)
- users.setName(s"users-$iter")
- }
+ val (floatUserFactors, floatProdFactors) = NewALS.train[Int](
+ ratings = ratings.map(r => NewALS.Rating(r.user, r.product, r.rating.toFloat)),
+ rank = rank,
+ numUserBlocks = numUserBlocks,
+ numItemBlocks = numProductBlocks,
+ maxIter = iterations,
+ regParam = lambda,
+ implicitPrefs = implicitPrefs,
+ alpha = alpha,
+ nonnegative = nonnegative,
+ intermediateRDDStorageLevel = intermediateRDDStorageLevel,
+ finalRDDStorageLevel = StorageLevel.NONE,
+ seed = seed)
+
+ val userFactors = floatUserFactors
+ .mapValues(_.map(_.toDouble))
+ .setName("users")
+ .persist(finalRDDStorageLevel)
+ val prodFactors = floatProdFactors
+ .mapValues(_.map(_.toDouble))
+ .setName("products")
+ .persist(finalRDDStorageLevel)
+ if (finalRDDStorageLevel != StorageLevel.NONE) {
+ userFactors.count()
+ prodFactors.count()
}
-
- // The last `products` will be used twice. One to generate the last `users` and the other to
- // generate `productsOut`. So we cache it for better performance.
- products.setName("products").persist()
-
- // Flatten and cache the two final RDDs to un-block them
- val usersOut = unblockFactors(users, userOutLinks)
- val productsOut = unblockFactors(products, productOutLinks)
-
- usersOut.setName("usersOut").persist(finalRDDStorageLevel)
- productsOut.setName("productsOut").persist(finalRDDStorageLevel)
-
- // Materialize usersOut and productsOut.
- usersOut.count()
- productsOut.count()
-
- products.unpersist()
-
- // Clean up.
- userInLinks.unpersist()
- userOutLinks.unpersist()
- productInLinks.unpersist()
- productOutLinks.unpersist()
-
- new MatrixFactorizationModel(rank, usersOut, productsOut)
+ new MatrixFactorizationModel(rank, userFactors, prodFactors)
}
/**
* Java-friendly version of [[ALS.run]].
*/
def run(ratings: JavaRDD[Rating]): MatrixFactorizationModel = run(ratings.rdd)
-
- /**
- * Computes the (`rank x rank`) matrix `YtY`, where `Y` is the (`nui x rank`) matrix of factors
- * for each user (or product), in a distributed fashion.
- *
- * @param factors the (block-distributed) user or product factor vectors
- * @return YtY - whose value is only used in the implicit preference model
- */
- private def computeYtY(factors: RDD[(Int, Array[Array[Double]])]) = {
- val n = rank * (rank + 1) / 2
- val LYtY = factors.values.aggregate(new DoubleMatrix(n))( seqOp = (L, Y) => {
- Y.foreach(y => dspr(1.0, wrapDoubleArray(y), L))
- L
- }, combOp = (L1, L2) => {
- L1.addi(L2)
- })
- val YtY = new DoubleMatrix(rank, rank)
- fillFullMatrix(LYtY, YtY)
- YtY
- }
-
- /**
- * Adds alpha * x * x.t to a matrix in-place. This is the same as BLAS's DSPR.
- *
- * @param L the lower triangular part of the matrix packed in an array (row major)
- */
- private def dspr(alpha: Double, x: DoubleMatrix, L: DoubleMatrix) = {
- val n = x.length
- var i = 0
- var j = 0
- var idx = 0
- var axi = 0.0
- val xd = x.data
- val Ld = L.data
- while (i < n) {
- axi = alpha * xd(i)
- j = 0
- while (j <= i) {
- Ld(idx) += axi * xd(j)
- j += 1
- idx += 1
- }
- i += 1
- }
- }
-
- /**
- * Wrap a double array in a DoubleMatrix without creating garbage.
- * This is a temporary fix for jblas 1.2.3; it should be safe to move back to the
- * DoubleMatrix(double[]) constructor come jblas 1.2.4.
- */
- private def wrapDoubleArray(v: Array[Double]): DoubleMatrix = {
- new DoubleMatrix(v.length, 1, v: _*)
- }
-
- /**
- * Flatten out blocked user or product factors into an RDD of (id, factor vector) pairs
- */
- private def unblockFactors(
- blockedFactors: RDD[(Int, Array[Array[Double]])],
- outLinks: RDD[(Int, OutLinkBlock)]): RDD[(Int, Array[Double])] = {
- blockedFactors.join(outLinks).flatMap { case (b, (factors, outLinkBlock)) =>
- for (i <- 0 until factors.length) yield (outLinkBlock.elementIds(i), factors(i))
- }
- }
-
- /**
- * Make the out-links table for a block of the users (or products) dataset given the list of
- * (user, product, rating) values for the users in that block (or the opposite for products).
- */
- private def makeOutLinkBlock(numProductBlocks: Int, ratings: Array[Rating],
- productPartitioner: Partitioner): OutLinkBlock = {
- val userIds = ratings.map(_.user).distinct.sorted
- val numUsers = userIds.length
- val userIdToPos = userIds.zipWithIndex.toMap
- val shouldSend = Array.fill(numUsers)(new mutable.BitSet(numProductBlocks))
- for (r <- ratings) {
- shouldSend(userIdToPos(r.user))(productPartitioner.getPartition(r.product)) = true
- }
- OutLinkBlock(userIds, shouldSend)
- }
-
- /**
- * Make the in-links table for a block of the users (or products) dataset given a list of
- * (user, product, rating) values for the users in that block (or the opposite for products).
- */
- private def makeInLinkBlock(numProductBlocks: Int, ratings: Array[Rating],
- productPartitioner: Partitioner): InLinkBlock = {
- val userIds = ratings.map(_.user).distinct.sorted
- val userIdToPos = userIds.zipWithIndex.toMap
- // Split out our ratings by product block
- val blockRatings = Array.fill(numProductBlocks)(new ArrayBuffer[Rating])
- for (r <- ratings) {
- blockRatings(productPartitioner.getPartition(r.product)) += r
- }
- val ratingsForBlock = new Array[Array[(Array[Int], Array[Double])]](numProductBlocks)
- for (productBlock <- 0 until numProductBlocks) {
- // Create an array of (product, Seq(Rating)) ratings
- val groupedRatings = blockRatings(productBlock).groupBy(_.product).toArray
- // Sort them by product ID
- val ordering = new Ordering[(Int, ArrayBuffer[Rating])] {
- def compare(a: (Int, ArrayBuffer[Rating]), b: (Int, ArrayBuffer[Rating])): Int =
- a._1 - b._1
- }
- Sorting.quickSort(groupedRatings)(ordering)
- // Translate the user IDs to indices based on userIdToPos
- ratingsForBlock(productBlock) = groupedRatings.map { case (p, rs) =>
- (rs.view.map(r => userIdToPos(r.user)).toArray, rs.view.map(_.rating).toArray)
- }
- }
- InLinkBlock(userIds, ratingsForBlock)
- }
-
- /**
- * Make RDDs of InLinkBlocks and OutLinkBlocks given an RDD of (blockId, (u, p, r)) values for
- * the users (or (blockId, (p, u, r)) for the products). We create these simultaneously to avoid
- * having to shuffle the (blockId, (u, p, r)) RDD twice, or to cache it.
- */
- private def makeLinkRDDs(
- numUserBlocks: Int,
- numProductBlocks: Int,
- ratingsByUserBlock: RDD[(Int, Rating)],
- productPartitioner: Partitioner): (RDD[(Int, InLinkBlock)], RDD[(Int, OutLinkBlock)]) = {
- val grouped = ratingsByUserBlock.partitionBy(new HashPartitioner(numUserBlocks))
- val links = grouped.mapPartitionsWithIndex((blockId, elements) => {
- val ratings = elements.map(_._2).toArray
- val inLinkBlock = makeInLinkBlock(numProductBlocks, ratings, productPartitioner)
- val outLinkBlock = makeOutLinkBlock(numProductBlocks, ratings, productPartitioner)
- Iterator.single((blockId, (inLinkBlock, outLinkBlock)))
- }, preservesPartitioning = true)
- val inLinks = links.mapValues(_._1)
- val outLinks = links.mapValues(_._2)
- inLinks.persist(intermediateRDDStorageLevel)
- outLinks.persist(intermediateRDDStorageLevel)
- (inLinks, outLinks)
- }
-
- /**
- * Make a random factor vector with the given random.
- */
- private def randomFactor(rank: Int, rand: Random): Array[Double] = {
- // Choose a unit vector uniformly at random from the unit sphere, but from the
- // "first quadrant" where all elements are nonnegative. This can be done by choosing
- // elements distributed as Normal(0,1) and taking the absolute value, and then normalizing.
- // This appears to create factorizations that have a slightly better reconstruction
- // (<1%) compared picking elements uniformly at random in [0,1].
- val factor = Array.fill(rank)(abs(rand.nextGaussian()))
- val norm = sqrt(factor.map(x => x * x).sum)
- factor.map(x => x / norm)
- }
-
- /**
- * Compute the user feature vectors given the current products (or vice-versa). This first joins
- * the products with their out-links to generate a set of messages to each destination block
- * (specifically, the features for the products that user block cares about), then groups these
- * by destination and joins them with the in-link info to figure out how to update each user.
- * It returns an RDD of new feature vectors for each user block.
- */
- private def updateFeatures(
- numUserBlocks: Int,
- products: RDD[(Int, Array[Array[Double]])],
- productOutLinks: RDD[(Int, OutLinkBlock)],
- userInLinks: RDD[(Int, InLinkBlock)],
- rank: Int,
- lambda: Double,
- alpha: Double,
- YtY: Option[Broadcast[DoubleMatrix]]): RDD[(Int, Array[Array[Double]])] = {
- productOutLinks.join(products).flatMap { case (bid, (outLinkBlock, factors)) =>
- val toSend = Array.fill(numUserBlocks)(new ArrayBuffer[Array[Double]])
- for (p <- 0 until outLinkBlock.elementIds.length; userBlock <- 0 until numUserBlocks) {
- if (outLinkBlock.shouldSend(p)(userBlock)) {
- toSend(userBlock) += factors(p)
- }
- }
- toSend.zipWithIndex.map{ case (buf, idx) => (idx, (bid, buf.toArray)) }
- }.groupByKey(new HashPartitioner(numUserBlocks))
- .join(userInLinks)
- .mapValues{ case (messages, inLinkBlock) =>
- updateBlock(messages, inLinkBlock, rank, lambda, alpha, YtY)
- }
- }
-
- /**
- * Compute the new feature vectors for a block of the users matrix given the list of factors
- * it received from each product and its InLinkBlock.
- */
- private def updateBlock(messages: Iterable[(Int, Array[Array[Double]])], inLinkBlock: InLinkBlock,
- rank: Int, lambda: Double, alpha: Double, YtY: Option[Broadcast[DoubleMatrix]])
- : Array[Array[Double]] =
- {
- // Sort the incoming block factor messages by block ID and make them an array
- val blockFactors = messages.toSeq.sortBy(_._1).map(_._2).toArray // Array[Array[Double]]
- val numProductBlocks = blockFactors.length
- val numUsers = inLinkBlock.elementIds.length
-
- // We'll sum up the XtXes using vectors that represent only the lower-triangular part, since
- // the matrices are symmetric
- val triangleSize = rank * (rank + 1) / 2
- val userXtX = Array.fill(numUsers)(DoubleMatrix.zeros(triangleSize))
- val userXy = Array.fill(numUsers)(DoubleMatrix.zeros(rank))
-
- // Some temp variables to avoid memory allocation
- val tempXtX = DoubleMatrix.zeros(triangleSize)
- val fullXtX = DoubleMatrix.zeros(rank, rank)
-
- // Count the number of ratings each user gives to provide user-specific regularization
- val numRatings = Array.fill(numUsers)(0)
-
- // Compute the XtX and Xy values for each user by adding products it rated in each product
- // block
- for (productBlock <- 0 until numProductBlocks) {
- var p = 0
- while (p < blockFactors(productBlock).length) {
- val x = wrapDoubleArray(blockFactors(productBlock)(p))
- tempXtX.fill(0.0)
- dspr(1.0, x, tempXtX)
- val (us, rs) = inLinkBlock.ratingsForBlock(productBlock)(p)
- if (implicitPrefs) {
- var i = 0
- while (i < us.length) {
- numRatings(us(i)) += 1
- // Extension to the original paper to handle rs(i) < 0. confidence is a function
- // of |rs(i)| instead so that it is never negative:
- val confidence = 1 + alpha * abs(rs(i))
- SimpleBlas.axpy(confidence - 1.0, tempXtX, userXtX(us(i)))
- // For rs(i) < 0, the corresponding entry in P is 0 now, not 1 -- negative rs(i)
- // means we try to reconstruct 0. We add terms only where P = 1, so, term below
- // is now only added for rs(i) > 0:
- if (rs(i) > 0) {
- SimpleBlas.axpy(confidence, x, userXy(us(i)))
- }
- i += 1
- }
- } else {
- var i = 0
- while (i < us.length) {
- numRatings(us(i)) += 1
- userXtX(us(i)).addi(tempXtX)
- SimpleBlas.axpy(rs(i), x, userXy(us(i)))
- i += 1
- }
- }
- p += 1
- }
- }
-
- val ws = if (nonnegative) NNLS.createWorkspace(rank) else null
-
- // Solve the least-squares problem for each user and return the new feature vectors
- Array.range(0, numUsers).map { index =>
- // Compute the full XtX matrix from the lower-triangular part we got above
- fillFullMatrix(userXtX(index), fullXtX)
- // Add regularization
- val regParam = numRatings(index) * lambda
- var i = 0
- while (i < rank) {
- fullXtX.data(i * rank + i) += regParam
- i += 1
- }
- // Solve the resulting matrix, which is symmetric and positive-definite
- if (implicitPrefs) {
- solveLeastSquares(fullXtX.addi(YtY.get.value), userXy(index), ws)
- } else {
- solveLeastSquares(fullXtX, userXy(index), ws)
- }
- }
- }
-
- /**
- * Given A^T A and A^T b, find the x minimising ||Ax - b||_2, possibly subject
- * to nonnegativity constraints if `nonnegative` is true.
- */
- private def solveLeastSquares(ata: DoubleMatrix, atb: DoubleMatrix,
- ws: NNLS.Workspace): Array[Double] = {
- if (!nonnegative) {
- Solve.solvePositive(ata, atb).data
- } else {
- NNLS.solve(ata, atb, ws)
- }
- }
-
- /**
- * Given a triangular matrix in the order of fillXtX above, compute the full symmetric square
- * matrix that it represents, storing it into destMatrix.
- */
- private def fillFullMatrix(triangularMatrix: DoubleMatrix, destMatrix: DoubleMatrix) {
- val rank = destMatrix.rows
- var i = 0
- var pos = 0
- while (i < rank) {
- var j = 0
- while (j <= i) {
- destMatrix.data(i*rank + j) = triangularMatrix.data(pos)
- destMatrix.data(j*rank + i) = triangularMatrix.data(pos)
- pos += 1
- j += 1
- }
- i += 1
- }
- }
-}
-
-/**
- * Partitioner for ALS.
- */
-private[recommendation] class ALSPartitioner(override val numPartitions: Int) extends Partitioner {
- override def getPartition(key: Any): Int = {
- Utils.nonNegativeMod(byteswap32(key.asInstanceOf[Int]), numPartitions)
- }
-
- override def equals(obj: Any): Boolean = {
- obj match {
- case p: ALSPartitioner =>
- this.numPartitions == p.numPartitions
- case _ =>
- false
- }
- }
}
/**
@@ -834,120 +410,4 @@ object ALS {
: MatrixFactorizationModel = {
trainImplicit(ratings, rank, iterations, 0.01, -1, 1.0)
}
-
- /**
- * :: DeveloperApi ::
- * Statistics of a block in ALS computation.
- *
- * @param category type of this block, "user" or "product"
- * @param index index of this block
- * @param count number of users or products inside this block, the same as the number of
- * least-squares problems to solve on this block in each iteration
- * @param numRatings total number of ratings inside this block, the same as the number of outer
- * products we need to make on this block in each iteration
- * @param numInLinks total number of incoming links, the same as the number of vectors to retrieve
- * before each iteration
- * @param numOutLinks total number of outgoing links, the same as the number of vectors to send
- * for the next iteration
- */
- @DeveloperApi
- case class BlockStats(
- category: String,
- index: Int,
- count: Long,
- numRatings: Long,
- numInLinks: Long,
- numOutLinks: Long)
-
- /**
- * :: DeveloperApi ::
- * Given an RDD of ratings, number of user blocks, and number of product blocks, computes the
- * statistics of each block in ALS computation. This is useful for estimating cost and diagnosing
- * load balance.
- *
- * @param ratings an RDD of ratings
- * @param numUserBlocks number of user blocks
- * @param numProductBlocks number of product blocks
- * @return statistics of user blocks and product blocks
- */
- @DeveloperApi
- def analyzeBlocks(
- ratings: RDD[Rating],
- numUserBlocks: Int,
- numProductBlocks: Int): Array[BlockStats] = {
-
- val userPartitioner = new ALSPartitioner(numUserBlocks)
- val productPartitioner = new ALSPartitioner(numProductBlocks)
-
- val ratingsByUserBlock = ratings.map { rating =>
- (userPartitioner.getPartition(rating.user), rating)
- }
- val ratingsByProductBlock = ratings.map { rating =>
- (productPartitioner.getPartition(rating.product),
- Rating(rating.product, rating.user, rating.rating))
- }
-
- val als = new ALS()
- val (userIn, userOut) =
- als.makeLinkRDDs(numUserBlocks, numProductBlocks, ratingsByUserBlock, userPartitioner)
- val (prodIn, prodOut) =
- als.makeLinkRDDs(numProductBlocks, numUserBlocks, ratingsByProductBlock, productPartitioner)
-
- def sendGrid(outLinks: RDD[(Int, OutLinkBlock)]): Map[(Int, Int), Long] = {
- outLinks.map { x =>
- val grid = new mutable.HashMap[(Int, Int), Long]()
- val uPartition = x._1
- x._2.shouldSend.foreach { ss =>
- ss.foreach { pPartition =>
- val pair = (uPartition, pPartition)
- grid.put(pair, grid.getOrElse(pair, 0L) + 1L)
- }
- }
- grid
- }.reduce { (grid1, grid2) =>
- grid2.foreach { x =>
- grid1.put(x._1, grid1.getOrElse(x._1, 0L) + x._2)
- }
- grid1
- }.toMap
- }
-
- val userSendGrid = sendGrid(userOut)
- val prodSendGrid = sendGrid(prodOut)
-
- val userInbound = new Array[Long](numUserBlocks)
- val prodInbound = new Array[Long](numProductBlocks)
- val userOutbound = new Array[Long](numUserBlocks)
- val prodOutbound = new Array[Long](numProductBlocks)
-
- for (u <- 0 until numUserBlocks; p <- 0 until numProductBlocks) {
- userOutbound(u) += userSendGrid.getOrElse((u, p), 0L)
- prodInbound(p) += userSendGrid.getOrElse((u, p), 0L)
- userInbound(u) += prodSendGrid.getOrElse((p, u), 0L)
- prodOutbound(p) += prodSendGrid.getOrElse((p, u), 0L)
- }
-
- val userCounts = userOut.mapValues(x => x.elementIds.length).collectAsMap()
- val prodCounts = prodOut.mapValues(x => x.elementIds.length).collectAsMap()
-
- val userRatings = countRatings(userIn)
- val prodRatings = countRatings(prodIn)
-
- val userStats = Array.tabulate(numUserBlocks)(
- u => BlockStats("user", u, userCounts(u), userRatings(u), userInbound(u), userOutbound(u)))
- val productStatus = Array.tabulate(numProductBlocks)(
- p => BlockStats("product", p, prodCounts(p), prodRatings(p), prodInbound(p), prodOutbound(p)))
-
- (userStats ++ productStatus).toArray
- }
-
- private def countRatings(inLinks: RDD[(Int, InLinkBlock)]): Map[Int, Long] = {
- inLinks.mapValues { ilb =>
- var numRatings = 0L
- ilb.ratingsForBlock.foreach { ar =>
- ar.foreach { p => numRatings += p._1.length }
- }
- numRatings
- }.collectAsMap().toMap
- }
}
diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
index ee08c3c327..acc447742b 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
@@ -414,7 +414,7 @@ class ALSSuite extends FunSuite with MLlibTestSparkContext with Logging {
val (training, test) =
genExplicitTestData(numUsers = 20, numItems = 40, rank = 2, noiseStd = 0.01)
for ((numUserBlocks, numItemBlocks) <- Seq((1, 1), (1, 2), (2, 1), (2, 2))) {
- testALS(training, test, maxIter = 4, rank = 2, regParam = 0.01, targetRMSE = 0.03,
+ testALS(training, test, maxIter = 4, rank = 3, regParam = 0.01, targetRMSE = 0.03,
numUserBlocks = numUserBlocks, numItemBlocks = numItemBlocks)
}
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
index e9fc37e000..8775c0ca9d 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
@@ -24,9 +24,7 @@ import scala.util.Random
import org.scalatest.FunSuite
import org.jblas.DoubleMatrix
-import org.apache.spark.SparkContext._
import org.apache.spark.mllib.util.MLlibTestSparkContext
-import org.apache.spark.mllib.recommendation.ALS.BlockStats
import org.apache.spark.storage.StorageLevel
object ALSSuite {
@@ -189,22 +187,6 @@ class ALSSuite extends FunSuite with MLlibTestSparkContext {
testALS(100, 200, 2, 15, 0.7, 0.4, false, false, false, -1, -1, false)
}
- test("analyze one user block and one product block") {
- val localRatings = Seq(
- Rating(0, 100, 1.0),
- Rating(0, 101, 2.0),
- Rating(0, 102, 3.0),
- Rating(1, 102, 4.0),
- Rating(2, 103, 5.0))
- val ratings = sc.makeRDD(localRatings, 2)
- val stats = ALS.analyzeBlocks(ratings, 1, 1)
- assert(stats.size === 2)
- assert(stats(0) === BlockStats("user", 0, 3, 5, 4, 3))
- assert(stats(1) === BlockStats("product", 0, 4, 5, 3, 4))
- }
-
- // TODO: add tests for analyzing multiple user/product blocks
-
/**
* Test if we can correctly factorize R = U * P where U and P are of known rank.
*