aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorMatei Zaharia <matei.zaharia@gmail.com>2013-07-14 07:59:50 +0000
committerMatei Zaharia <matei.zaharia@gmail.com>2013-07-14 07:59:50 +0000
commitc5c38d1987137a1dc5eb66dd1065735a542ef9b5 (patch)
treeb3ee27a914e057218f12e6c6608765e6935693a9 /mllib
parentb91a218cea5a7ab4037675667922fc06bfec6fbf (diff)
downloadspark-c5c38d1987137a1dc5eb66dd1065735a542ef9b5.tar.gz
spark-c5c38d1987137a1dc5eb66dd1065735a542ef9b5.tar.bz2
spark-c5c38d1987137a1dc5eb66dd1065735a542ef9b5.zip
Some optimizations to loading phase of ALS
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/spark/mllib/recommendation/ALS.scala45
1 files changed, 32 insertions, 13 deletions
diff --git a/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala
index 6c9fb2359c..dec3701ec0 100644
--- a/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala
@@ -2,6 +2,7 @@ package spark.mllib.recommendation
import scala.collection.mutable.{ArrayBuffer, BitSet}
import scala.util.Random
+import scala.util.Sorting
import spark.{HashPartitioner, Partitioner, SparkContext, RDD}
import spark.storage.StorageLevel
@@ -34,6 +35,12 @@ private[recommendation] case class InLinkBlock(
/**
+ * A more compact class to represent a rating than Tuple3[Int, Int, Double].
+ */
+private[recommendation] case class Rating(user: Int, product: Int, rating: Double)
+
+
+/**
* Alternating Least Squares matrix factorization.
*
* This is a blocked implementation of the ALS factorization algorithm that groups the two sets
@@ -126,13 +133,13 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
* Make the out-links table for a block of the users (or products) dataset given the list of
* (user, product, rating) values for the users in that block (or the opposite for products).
*/
- private def makeOutLinkBlock(numBlocks: Int, ratings: Array[(Int, Int, Double)]): OutLinkBlock = {
- val userIds = ratings.map(_._1).distinct.sorted
+ private def makeOutLinkBlock(numBlocks: Int, ratings: Array[Rating]): OutLinkBlock = {
+ val userIds = ratings.map(_.user).distinct.sorted
val numUsers = userIds.length
val userIdToPos = userIds.zipWithIndex.toMap
val shouldSend = Array.fill(numUsers)(new BitSet(numBlocks))
- for ((u, p, r) <- ratings) {
- shouldSend(userIdToPos(u))(p % numBlocks) = true
+ for (r <- ratings) {
+ shouldSend(userIdToPos(r.user))(r.product % numBlocks) = true
}
OutLinkBlock(userIds, shouldSend)
}
@@ -141,18 +148,28 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
* Make the in-links table for a block of the users (or products) dataset given a list of
* (user, product, rating) values for the users in that block (or the opposite for products).
*/
- private def makeInLinkBlock(numBlocks: Int, ratings: Array[(Int, Int, Double)]): InLinkBlock = {
- val userIds = ratings.map(_._1).distinct.sorted
+ private def makeInLinkBlock(numBlocks: Int, ratings: Array[Rating]): InLinkBlock = {
+ val userIds = ratings.map(_.user).distinct.sorted
val numUsers = userIds.length
val userIdToPos = userIds.zipWithIndex.toMap
+ // Split out our ratings by product block
+ val blockRatings = Array.fill(numBlocks)(new ArrayBuffer[Rating])
+ for (r <- ratings) {
+ blockRatings(r.product % numBlocks) += r
+ }
val ratingsForBlock = new Array[Array[(Array[Int], Array[Double])]](numBlocks)
for (productBlock <- 0 until numBlocks) {
- val ratingsInBlock = ratings.filter(t => t._2 % numBlocks == productBlock)
- val ratingsByProduct = ratingsInBlock.groupBy(_._2) // (p, Seq[(u, p, r)])
- .toArray
- .sortBy(_._1)
- .map{case (p, rs) => (rs.map(t => userIdToPos(t._1)), rs.map(_._3))}
- ratingsForBlock(productBlock) = ratingsByProduct
+ // Create an array of (product, Seq(Rating)) ratings
+ val groupedRatings = blockRatings(productBlock).groupBy(_.product).toArray
+ // Sort them by user ID
+ val ordering = new Ordering[(Int, ArrayBuffer[Rating])] {
+ def compare(a: (Int, ArrayBuffer[Rating]), b: (Int, ArrayBuffer[Rating])): Int = a._1 - b._1
+ }
+ Sorting.quickSort(groupedRatings)(ordering)
+ // Translate the user IDs to indices based on userIdToPos
+ ratingsForBlock(productBlock) = groupedRatings.map { case (p, rs) =>
+ (rs.view.map(r => userIdToPos(r.user)).toArray, rs.view.map(_.rating).toArray)
+ }
}
InLinkBlock(userIds, ratingsForBlock)
}
@@ -167,7 +184,7 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
{
val grouped = ratings.partitionBy(new HashPartitioner(numBlocks))
val links = grouped.mapPartitionsWithIndex((blockId, elements) => {
- val ratings = elements.map(_._2).toArray
+ val ratings = elements.map{case (k, t) => Rating(t._1, t._2, t._3)}.toArray
val inLinkBlock = makeInLinkBlock(numBlocks, ratings)
val outLinkBlock = makeOutLinkBlock(numBlocks, ratings)
Iterator.single((blockId, (inLinkBlock, outLinkBlock)))
@@ -373,6 +390,8 @@ object ALS {
}
val (master, ratingsFile, rank, iters, outputDir) =
(args(0), args(1), args(2).toInt, args(3).toInt, args(4))
+ System.setProperty("spark.serializer", "spark.KryoSerializer")
+ System.setProperty("spark.locality.wait", "10000")
val sc = new SparkContext(master, "ALS")
val ratings = sc.textFile(ratingsFile).map { line =>
val fields = line.split(',')