From 0e553a3e9360a736920e2214d634373fef0dbcf7 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 2 Jul 2015 10:18:23 -0700 Subject: [SPARK-8708] [MLLIB] Paritition ALS ratings based on both users and products JIRA: https://issues.apache.org/jira/browse/SPARK-8708 Previously the partitions of ratings are only based on the given products. So if the `usersProducts` given for prediction contains only few products or even one product, the generated ratings will be pushed into few or single partition and can't use high parallelism. The following codes are the example reported in the JIRA. Because it asks the predictions for users on product 2. There is only one partition in the result. >>> r1 = (1, 1, 1.0) >>> r2 = (1, 2, 2.0) >>> r3 = (2, 1, 2.0) >>> r4 = (2, 2, 2.0) >>> r5 = (3, 1, 1.0) >>> ratings = sc.parallelize([r1, r2, r3, r4, r5], 5) >>> users = ratings.map(itemgetter(0)).distinct() >>> model = ALS.trainImplicit(ratings, 1, seed=10) >>> predictions_for_2 = model.predictAll(users.map(lambda u: (u, 2))) >>> predictions_for_2.glom().map(len).collect() [0, 0, 3, 0, 0] This PR uses user and product instead of only product to partition the ratings. Author: Liang-Chi Hsieh Author: Liang-Chi Hsieh Closes #7121 from viirya/mfm_fix_partition and squashes the following commits: 779946d [Liang-Chi Hsieh] Calculate approximate numbers of users and products in one pass. 4336dc2 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into mfm_fix_partition 83e56c1 [Liang-Chi Hsieh] Instead of additional join, use the numbers of users and products to decide how to perform join. b534dc8 [Liang-Chi Hsieh] Paritition ratings based on both users and products. --- .../recommendation/MatrixFactorizationModel.scala | 55 +++++++++++++++++++--- 1 file changed, 49 insertions(+), 6 deletions(-) (limited to 'mllib') diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 93aa41e499..43d219a49c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -22,6 +22,7 @@ import java.lang.{Integer => JavaInteger} import scala.collection.mutable +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus import com.github.fommil.netlib.BLAS.{getInstance => blas} import org.apache.hadoop.fs.Path import org.json4s._ @@ -79,6 +80,30 @@ class MatrixFactorizationModel( blas.ddot(rank, userVector, 1, productVector, 1) } + /** + * Return approximate numbers of users and products in the given usersProducts tuples. + * This method is based on `countApproxDistinct` in class `RDD`. + * + * @param usersProducts RDD of (user, product) pairs. + * @return approximate numbers of users and products. + */ + private[this] def countApproxDistinctUserProduct(usersProducts: RDD[(Int, Int)]): (Long, Long) = { + val zeroCounterUser = new HyperLogLogPlus(4, 0) + val zeroCounterProduct = new HyperLogLogPlus(4, 0) + val aggregated = usersProducts.aggregate((zeroCounterUser, zeroCounterProduct))( + (hllTuple: (HyperLogLogPlus, HyperLogLogPlus), v: (Int, Int)) => { + hllTuple._1.offer(v._1) + hllTuple._2.offer(v._2) + hllTuple + }, + (h1: (HyperLogLogPlus, HyperLogLogPlus), h2: (HyperLogLogPlus, HyperLogLogPlus)) => { + h1._1.addAll(h2._1) + h1._2.addAll(h2._2) + h1 + }) + (aggregated._1.cardinality(), aggregated._2.cardinality()) + } + /** * Predict the rating of many users for many products. * The output RDD has an element per each element in the input RDD (including all duplicates) @@ -88,12 +113,30 @@ class MatrixFactorizationModel( * @return RDD of Ratings. */ def predict(usersProducts: RDD[(Int, Int)]): RDD[Rating] = { - val users = userFeatures.join(usersProducts).map { - case (user, (uFeatures, product)) => (product, (user, uFeatures)) - } - users.join(productFeatures).map { - case (product, ((user, uFeatures), pFeatures)) => - Rating(user, product, blas.ddot(uFeatures.length, uFeatures, 1, pFeatures, 1)) + // Previously the partitions of ratings are only based on the given products. + // So if the usersProducts given for prediction contains only few products or + // even one product, the generated ratings will be pushed into few or single partition + // and can't use high parallelism. + // Here we calculate approximate numbers of users and products. Then we decide the + // partitions should be based on users or products. + val (usersCount, productsCount) = countApproxDistinctUserProduct(usersProducts) + + if (usersCount < productsCount) { + val users = userFeatures.join(usersProducts).map { + case (user, (uFeatures, product)) => (product, (user, uFeatures)) + } + users.join(productFeatures).map { + case (product, ((user, uFeatures), pFeatures)) => + Rating(user, product, blas.ddot(uFeatures.length, uFeatures, 1, pFeatures, 1)) + } + } else { + val products = productFeatures.join(usersProducts.map(_.swap)).map { + case (product, (pFeatures, user)) => (user, (product, pFeatures)) + } + products.join(userFeatures).map { + case (user, ((product, pFeatures), uFeatures)) => + Rating(user, product, blas.ddot(uFeatures.length, uFeatures, 1, pFeatures, 1)) + } } } -- cgit v1.2.3