aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala30
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala38
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala33
-rw-r--r--python/pyspark/mllib/_common.py25
-rw-r--r--python/pyspark/mllib/recommendation.py12
5 files changed, 134 insertions, 4 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index 8247c1ebc5..2d8623392e 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -197,6 +197,7 @@ class PythonMLLibAPI extends Serializable {
return ret
}
+ /** Unpack a Rating object from an array of bytes */
private def unpackRating(ratingBytes: Array[Byte]): Rating = {
val bb = ByteBuffer.wrap(ratingBytes)
bb.order(ByteOrder.nativeOrder())
@@ -206,6 +207,35 @@ class PythonMLLibAPI extends Serializable {
return new Rating(user, product, rating)
}
+ /** Unpack a tuple of Ints from an array of bytes */
+ private[spark] def unpackTuple(tupleBytes: Array[Byte]): (Int, Int) = {
+ val bb = ByteBuffer.wrap(tupleBytes)
+ bb.order(ByteOrder.nativeOrder())
+ val v1 = bb.getInt()
+ val v2 = bb.getInt()
+ (v1, v2)
+ }
+
+ /**
+ * Serialize a Rating object into an array of bytes.
+ * It can be deserialized using RatingDeserializer().
+ *
+ * @param rate
+ * @return
+ */
+ private[spark] def serializeRating(rate: Rating): Array[Byte] = {
+ val len = 3
+ val bytes = new Array[Byte](4 + 8 * len)
+ val bb = ByteBuffer.wrap(bytes)
+ bb.order(ByteOrder.nativeOrder())
+ bb.putInt(len)
+ val db = bb.asDoubleBuffer()
+ db.put(rate.user.toDouble)
+ db.put(rate.product.toDouble)
+ db.put(rate.rating)
+ bytes
+ }
+
/**
* Java stub for Python mllib ALS.train(). This stub returns a handle
* to the Java object instead of the content of the Java object. Extra care
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
index af43d89c70..443fc5de5b 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
@@ -19,8 +19,11 @@ package org.apache.spark.mllib.recommendation
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.api.python.PythonMLLibAPI
import org.jblas._
+import org.apache.spark.api.java.JavaRDD
+
/**
* Model representing the result of matrix factorization.
@@ -44,6 +47,39 @@ class MatrixFactorizationModel(
userVector.dot(productVector)
}
- // TODO: Figure out what good bulk prediction methods would look like.
+ /**
+ * Predict the rating of many users for many products.
+ * The output RDD has an element per each element in the input RDD (including all duplicates)
+ * unless a user or product is missing in the training set.
+ *
+ * @param usersProducts RDD of (user, product) pairs.
+ * @return RDD of Ratings.
+ */
+ def predict(usersProducts: RDD[(Int, Int)]): RDD[Rating] = {
+ val users = userFeatures.join(usersProducts).map{
+ case (user, (uFeatures, product)) => (product, (user, uFeatures))
+ }
+ users.join(productFeatures).map {
+ case (product, ((user, uFeatures), pFeatures)) =>
+ val userVector = new DoubleMatrix(uFeatures)
+ val productVector = new DoubleMatrix(pFeatures)
+ Rating(user, product, userVector.dot(productVector))
+ }
+ }
+
+ /**
+ * Predict the rating of many users for many products.
+ * This is a Java stub for python predictAll()
+ *
+ * @param usersProductsJRDD A JavaRDD with serialized tuples (user, product)
+ * @return JavaRDD of serialized Rating objects.
+ */
+ def predict(usersProductsJRDD: JavaRDD[Array[Byte]]): JavaRDD[Array[Byte]] = {
+ val pythonAPI = new PythonMLLibAPI()
+ val usersProducts = usersProductsJRDD.rdd.map(xBytes => pythonAPI.unpackTuple(xBytes))
+ predict(usersProducts).map(rate => pythonAPI.serializeRating(rate))
+ }
+
+ // TODO: Figure out what other good bulk prediction methods would look like.
// Probably want a way to get the top users for a product or vice-versa.
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
index fafc5ec5f2..e683a90f57 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
@@ -90,18 +90,34 @@ class ALSSuite extends FunSuite with BeforeAndAfterAll {
testALS(50, 100, 1, 15, 0.7, 0.3)
}
+ test("rank-1 matrices bulk") {
+ testALS(50, 100, 1, 15, 0.7, 0.3, false, true)
+ }
+
test("rank-2 matrices") {
testALS(100, 200, 2, 15, 0.7, 0.3)
}
+ test("rank-2 matrices bulk") {
+ testALS(100, 200, 2, 15, 0.7, 0.3, false, true)
+ }
+
test("rank-1 matrices implicit") {
testALS(80, 160, 1, 15, 0.7, 0.4, true)
}
+ test("rank-1 matrices implicit bulk") {
+ testALS(80, 160, 1, 15, 0.7, 0.4, true, true)
+ }
+
test("rank-2 matrices implicit") {
testALS(100, 200, 2, 15, 0.7, 0.4, true)
}
+ test("rank-2 matrices implicit bulk") {
+ testALS(100, 200, 2, 15, 0.7, 0.4, true, true)
+ }
+
/**
* Test if we can correctly factorize R = U * P where U and P are of known rank.
*
@@ -111,9 +127,12 @@ class ALSSuite extends FunSuite with BeforeAndAfterAll {
* @param iterations number of iterations to run
* @param samplingRate what fraction of the user-product pairs are known
* @param matchThreshold max difference allowed to consider a predicted rating correct
+ * @param implicitPrefs flag to test implicit feedback
+ * @param bulkPredict flag to test bulk prediciton
*/
def testALS(users: Int, products: Int, features: Int, iterations: Int,
- samplingRate: Double, matchThreshold: Double, implicitPrefs: Boolean = false)
+ samplingRate: Double, matchThreshold: Double, implicitPrefs: Boolean = false,
+ bulkPredict: Boolean = false)
{
val (sampledRatings, trueRatings, truePrefs) = ALSSuite.generateRatings(users, products,
features, samplingRate, implicitPrefs)
@@ -130,7 +149,17 @@ class ALSSuite extends FunSuite with BeforeAndAfterAll {
for ((p, vec) <- model.productFeatures.collect(); i <- 0 until features) {
predictedP.put(p, i, vec(i))
}
- val predictedRatings = predictedU.mmul(predictedP.transpose)
+ val predictedRatings = bulkPredict match {
+ case false => predictedU.mmul(predictedP.transpose)
+ case true =>
+ val allRatings = new DoubleMatrix(users, products)
+ val usersProducts = for (u <- 0 until users; p <- 0 until products) yield (u, p)
+ val userProductsRDD = sc.parallelize(usersProducts)
+ model.predict(userProductsRDD).collect().foreach { elem =>
+ allRatings.put(elem.user, elem.product, elem.rating)
+ }
+ allRatings
+ }
if (!implicitPrefs) {
for (u <- 0 until users; p <- 0 until products) {
diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py
index e74ba0fabc..769d88dfb9 100644
--- a/python/pyspark/mllib/_common.py
+++ b/python/pyspark/mllib/_common.py
@@ -18,6 +18,9 @@
from numpy import ndarray, copyto, float64, int64, int32, ones, array_equal, array, dot, shape
from pyspark import SparkContext
+from pyspark.serializers import Serializer
+import struct
+
# Double vector format:
#
# [8-byte 1] [8-byte length] [length*8 bytes of data]
@@ -213,6 +216,28 @@ def _serialize_rating(r):
intpart[0], intpart[1], doublepart[0] = r
return ba
+class RatingDeserializer(Serializer):
+ def loads(self, stream):
+ length = struct.unpack("!i", stream.read(4))[0]
+ ba = stream.read(length)
+ res = ndarray(shape=(3, ), buffer=ba, dtype="float64", offset=4)
+ return int(res[0]), int(res[1]), res[2]
+
+ def load_stream(self, stream):
+ while True:
+ try:
+ yield self.loads(stream)
+ except struct.error:
+ return
+ except EOFError:
+ return
+
+def _serialize_tuple(t):
+ ba = bytearray(8)
+ intpart = ndarray(shape=[2], buffer=ba, dtype=int32)
+ intpart[0], intpart[1] = t
+ return ba
+
def _test():
import doctest
globs = globals().copy()
diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py
index 14d06cba21..0eeb5bb66b 100644
--- a/python/pyspark/mllib/recommendation.py
+++ b/python/pyspark/mllib/recommendation.py
@@ -20,7 +20,9 @@ from pyspark.mllib._common import \
_get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
_serialize_double_matrix, _deserialize_double_matrix, \
_serialize_double_vector, _deserialize_double_vector, \
- _get_initial_weights, _serialize_rating, _regression_train_wrapper
+ _get_initial_weights, _serialize_rating, _regression_train_wrapper, \
+ _serialize_tuple, RatingDeserializer
+from pyspark.rdd import RDD
class MatrixFactorizationModel(object):
"""A matrix factorisation model trained by regularized alternating
@@ -33,6 +35,9 @@ class MatrixFactorizationModel(object):
>>> model = ALS.trainImplicit(sc, ratings, 1)
>>> model.predict(2,2) is not None
True
+ >>> testset = sc.parallelize([(1, 2), (1, 1)])
+ >>> model.predictAll(testset).count == 2
+ True
"""
def __init__(self, sc, java_model):
@@ -45,6 +50,11 @@ class MatrixFactorizationModel(object):
def predict(self, user, product):
return self._java_model.predict(user, product)
+ def predictAll(self, usersProducts):
+ usersProductsJRDD = _get_unmangled_rdd(usersProducts, _serialize_tuple)
+ return RDD(self._java_model.predict(usersProductsJRDD._jrdd),
+ self._context, RatingDeserializer())
+
class ALS(object):
@classmethod
def train(cls, sc, ratings, rank, iterations=5, lambda_=0.01, blocks=-1):