aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@gmail.com>2015-07-02 10:18:23 -0700
committerXiangrui Meng <meng@databricks.com>2015-07-02 10:18:23 -0700
commit0e553a3e9360a736920e2214d634373fef0dbcf7 (patch)
tree317ac5efd679292c4aaadb91b06b03c3e122f229 /mllib
parent52302a803967114b29a8bf6b74459477364c5b88 (diff)
downloadspark-0e553a3e9360a736920e2214d634373fef0dbcf7.tar.gz
spark-0e553a3e9360a736920e2214d634373fef0dbcf7.tar.bz2
spark-0e553a3e9360a736920e2214d634373fef0dbcf7.zip
[SPARK-8708] [MLLIB] Paritition ALS ratings based on both users and products
JIRA: https://issues.apache.org/jira/browse/SPARK-8708 Previously the partitions of ratings are only based on the given products. So if the `usersProducts` given for prediction contains only few products or even one product, the generated ratings will be pushed into few or single partition and can't use high parallelism. The following codes are the example reported in the JIRA. Because it asks the predictions for users on product 2. There is only one partition in the result. >>> r1 = (1, 1, 1.0) >>> r2 = (1, 2, 2.0) >>> r3 = (2, 1, 2.0) >>> r4 = (2, 2, 2.0) >>> r5 = (3, 1, 1.0) >>> ratings = sc.parallelize([r1, r2, r3, r4, r5], 5) >>> users = ratings.map(itemgetter(0)).distinct() >>> model = ALS.trainImplicit(ratings, 1, seed=10) >>> predictions_for_2 = model.predictAll(users.map(lambda u: (u, 2))) >>> predictions_for_2.glom().map(len).collect() [0, 0, 3, 0, 0] This PR uses user and product instead of only product to partition the ratings. Author: Liang-Chi Hsieh <viirya@gmail.com> Author: Liang-Chi Hsieh <viirya@appier.com> Closes #7121 from viirya/mfm_fix_partition and squashes the following commits: 779946d [Liang-Chi Hsieh] Calculate approximate numbers of users and products in one pass. 4336dc2 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into mfm_fix_partition 83e56c1 [Liang-Chi Hsieh] Instead of additional join, use the numbers of users and products to decide how to perform join. b534dc8 [Liang-Chi Hsieh] Paritition ratings based on both users and products.
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala55
1 files changed, 49 insertions, 6 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
index 93aa41e499..43d219a49c 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
@@ -22,6 +22,7 @@ import java.lang.{Integer => JavaInteger}
import scala.collection.mutable
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
import com.github.fommil.netlib.BLAS.{getInstance => blas}
import org.apache.hadoop.fs.Path
import org.json4s._
@@ -80,6 +81,30 @@ class MatrixFactorizationModel(
}
/**
+ * Return approximate numbers of users and products in the given usersProducts tuples.
+ * This method is based on `countApproxDistinct` in class `RDD`.
+ *
+ * @param usersProducts RDD of (user, product) pairs.
+ * @return approximate numbers of users and products.
+ */
+ private[this] def countApproxDistinctUserProduct(usersProducts: RDD[(Int, Int)]): (Long, Long) = {
+ val zeroCounterUser = new HyperLogLogPlus(4, 0)
+ val zeroCounterProduct = new HyperLogLogPlus(4, 0)
+ val aggregated = usersProducts.aggregate((zeroCounterUser, zeroCounterProduct))(
+ (hllTuple: (HyperLogLogPlus, HyperLogLogPlus), v: (Int, Int)) => {
+ hllTuple._1.offer(v._1)
+ hllTuple._2.offer(v._2)
+ hllTuple
+ },
+ (h1: (HyperLogLogPlus, HyperLogLogPlus), h2: (HyperLogLogPlus, HyperLogLogPlus)) => {
+ h1._1.addAll(h2._1)
+ h1._2.addAll(h2._2)
+ h1
+ })
+ (aggregated._1.cardinality(), aggregated._2.cardinality())
+ }
+
+ /**
* Predict the rating of many users for many products.
* The output RDD has an element per each element in the input RDD (including all duplicates)
* unless a user or product is missing in the training set.
@@ -88,12 +113,30 @@ class MatrixFactorizationModel(
* @return RDD of Ratings.
*/
def predict(usersProducts: RDD[(Int, Int)]): RDD[Rating] = {
- val users = userFeatures.join(usersProducts).map {
- case (user, (uFeatures, product)) => (product, (user, uFeatures))
- }
- users.join(productFeatures).map {
- case (product, ((user, uFeatures), pFeatures)) =>
- Rating(user, product, blas.ddot(uFeatures.length, uFeatures, 1, pFeatures, 1))
+ // Previously the partitions of ratings are only based on the given products.
+ // So if the usersProducts given for prediction contains only few products or
+ // even one product, the generated ratings will be pushed into few or single partition
+ // and can't use high parallelism.
+ // Here we calculate approximate numbers of users and products. Then we decide the
+ // partitions should be based on users or products.
+ val (usersCount, productsCount) = countApproxDistinctUserProduct(usersProducts)
+
+ if (usersCount < productsCount) {
+ val users = userFeatures.join(usersProducts).map {
+ case (user, (uFeatures, product)) => (product, (user, uFeatures))
+ }
+ users.join(productFeatures).map {
+ case (product, ((user, uFeatures), pFeatures)) =>
+ Rating(user, product, blas.ddot(uFeatures.length, uFeatures, 1, pFeatures, 1))
+ }
+ } else {
+ val products = productFeatures.join(usersProducts.map(_.swap)).map {
+ case (product, (pFeatures, user)) => (user, (product, pFeatures))
+ }
+ products.join(userFeatures).map {
+ case (user, ((product, pFeatures), uFeatures)) =>
+ Rating(user, product, blas.ddot(uFeatures.length, uFeatures, 1, pFeatures, 1))
+ }
}
}