From d9203350b06a9c737421ba244162b1365402c01b Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Wed, 11 Jun 2014 18:16:33 -0700 Subject: [SPARK-1672][MLLIB] Separate user and product partitioning in ALS Some clean up work following #593. 1. Allow to set different number user blocks and number product blocks in `ALS`. 2. Update `MovieLensALS` to reflect the change. Author: Tor Myklebust Author: Xiangrui Meng Closes #1014 from mengxr/SPARK-1672 and squashes the following commits: 0e910dd [Xiangrui Meng] change private[this] to private[recommendation] 36420c7 [Xiangrui Meng] set exclusion rules for ALS 9128b77 [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into SPARK-1672 294efe9 [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into SPARK-1672 9bab77b [Xiangrui Meng] clean up add numUserBlocks and numProductBlocks to MovieLensALS 84c8e8c [Xiangrui Meng] Merge branch 'master' into SPARK-1672 d17a8bf [Xiangrui Meng] merge master a4925fd [Tor Myklebust] Style. bd8a75c [Tor Myklebust] Merge branch 'master' of github.com:apache/spark into alsseppar 021f54b [Tor Myklebust] Separate user and product blocks. dcf583a [Tor Myklebust] Remove the partitioner member variable; instead, thread that needle everywhere it needs to go. 23d6f91 [Tor Myklebust] Stop making the partitioner configurable. 495784f [Tor Myklebust] Merge branch 'master' of https://github.com/apache/spark 674933a [Tor Myklebust] Fix style. 40edc23 [Tor Myklebust] Fix missing space. f841345 [Tor Myklebust] Fix daft bug creating 'pairs', also for -> foreach. 5ec9e6c [Tor Myklebust] Clean a couple of things up using 'map'. 36a0f43 [Tor Myklebust] Make the partitioner private. d872b09 [Tor Myklebust] Add negative id ALS test. df27697 [Tor Myklebust] Support custom partitioners. Currently we use the same partitioner for users and products. c90b6d8 [Tor Myklebust] Scramble user and product ids before bucketing. c774d7d [Tor Myklebust] Make the partitioner a member variable and use it instead of modding directly. --- .../apache/spark/examples/mllib/MovieLensALS.scala | 12 +- .../apache/spark/mllib/recommendation/ALS.scala | 164 +++++++++++++-------- .../spark/mllib/recommendation/ALSSuite.scala | 60 +++++--- project/MimaExcludes.scala | 8 + 4 files changed, 160 insertions(+), 84 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala index 6eb41e7ba3..28e201d279 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala @@ -50,6 +50,8 @@ object MovieLensALS { numIterations: Int = 20, lambda: Double = 1.0, rank: Int = 10, + numUserBlocks: Int = -1, + numProductBlocks: Int = -1, implicitPrefs: Boolean = false) def main(args: Array[String]) { @@ -67,8 +69,14 @@ object MovieLensALS { .text(s"lambda (smoothing constant), default: ${defaultParams.lambda}") .action((x, c) => c.copy(lambda = x)) opt[Unit]("kryo") - .text(s"use Kryo serialization") + .text("use Kryo serialization") .action((_, c) => c.copy(kryo = true)) + opt[Int]("numUserBlocks") + .text(s"number of user blocks, default: ${defaultParams.numUserBlocks} (auto)") + .action((x, c) => c.copy(numUserBlocks = x)) + opt[Int]("numProductBlocks") + .text(s"number of product blocks, default: ${defaultParams.numProductBlocks} (auto)") + .action((x, c) => c.copy(numProductBlocks = x)) opt[Unit]("implicitPrefs") .text("use implicit preference") .action((_, c) => c.copy(implicitPrefs = true)) @@ -160,6 +168,8 @@ object MovieLensALS { .setIterations(params.numIterations) .setLambda(params.lambda) .setImplicitPrefs(params.implicitPrefs) + .setUserBlocks(params.numUserBlocks) + .setProductBlocks(params.numProductBlocks) .run(training) val rmse = computeRmse(model, test, params.implicitPrefs) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index d743bd7dd1..f1ae7b85b4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -61,7 +61,7 @@ private[recommendation] case class InLinkBlock( * A more compact class to represent a rating than Tuple3[Int, Int, Double]. */ @Experimental -case class Rating(val user: Int, val product: Int, val rating: Double) +case class Rating(user: Int, product: Int, rating: Double) /** * Alternating Least Squares matrix factorization. @@ -93,7 +93,8 @@ case class Rating(val user: Int, val product: Int, val rating: Double) * preferences rather than explicit ratings given to items. */ class ALS private ( - private var numBlocks: Int, + private var numUserBlocks: Int, + private var numProductBlocks: Int, private var rank: Int, private var iterations: Int, private var lambda: Double, @@ -106,14 +107,31 @@ class ALS private ( * Constructs an ALS instance with default parameters: {numBlocks: -1, rank: 10, iterations: 10, * lambda: 0.01, implicitPrefs: false, alpha: 1.0}. */ - def this() = this(-1, 10, 10, 0.01, false, 1.0) + def this() = this(-1, -1, 10, 10, 0.01, false, 1.0) /** - * Set the number of blocks to parallelize the computation into; pass -1 for an auto-configured - * number of blocks. Default: -1. + * Set the number of blocks for both user blocks and product blocks to parallelize the computation + * into; pass -1 for an auto-configured number of blocks. Default: -1. */ def setBlocks(numBlocks: Int): ALS = { - this.numBlocks = numBlocks + this.numUserBlocks = numBlocks + this.numProductBlocks = numBlocks + this + } + + /** + * Set the number of user blocks to parallelize the computation. + */ + def setUserBlocks(numUserBlocks: Int): ALS = { + this.numUserBlocks = numUserBlocks + this + } + + /** + * Set the number of product blocks to parallelize the computation. + */ + def setProductBlocks(numProductBlocks: Int): ALS = { + this.numProductBlocks = numProductBlocks this } @@ -176,31 +194,32 @@ class ALS private ( def run(ratings: RDD[Rating]): MatrixFactorizationModel = { val sc = ratings.context - val numBlocks = if (this.numBlocks == -1) { + val numUserBlocks = if (this.numUserBlocks == -1) { math.max(sc.defaultParallelism, ratings.partitions.size / 2) } else { - this.numBlocks + this.numUserBlocks } - - val partitioner = new Partitioner { - val numPartitions = numBlocks - - def getPartition(x: Any): Int = { - Utils.nonNegativeMod(byteswap32(x.asInstanceOf[Int]), numPartitions) - } + val numProductBlocks = if (this.numProductBlocks == -1) { + math.max(sc.defaultParallelism, ratings.partitions.size / 2) + } else { + this.numProductBlocks } - val ratingsByUserBlock = ratings.map{ rating => - (partitioner.getPartition(rating.user), rating) + val userPartitioner = new ALSPartitioner(numUserBlocks) + val productPartitioner = new ALSPartitioner(numProductBlocks) + + val ratingsByUserBlock = ratings.map { rating => + (userPartitioner.getPartition(rating.user), rating) } - val ratingsByProductBlock = ratings.map{ rating => - (partitioner.getPartition(rating.product), + val ratingsByProductBlock = ratings.map { rating => + (productPartitioner.getPartition(rating.product), Rating(rating.product, rating.user, rating.rating)) } - val (userInLinks, userOutLinks) = makeLinkRDDs(numBlocks, ratingsByUserBlock, partitioner) + val (userInLinks, userOutLinks) = + makeLinkRDDs(numUserBlocks, numProductBlocks, ratingsByUserBlock, productPartitioner) val (productInLinks, productOutLinks) = - makeLinkRDDs(numBlocks, ratingsByProductBlock, partitioner) + makeLinkRDDs(numProductBlocks, numUserBlocks, ratingsByProductBlock, userPartitioner) userInLinks.setName("userInLinks") userOutLinks.setName("userOutLinks") productInLinks.setName("productInLinks") @@ -232,27 +251,27 @@ class ALS private ( users.setName(s"users-$iter").persist() val YtY = Some(sc.broadcast(computeYtY(users))) val previousProducts = products - products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda, - alpha, YtY) + products = updateFeatures(numProductBlocks, users, userOutLinks, productInLinks, + userPartitioner, rank, lambda, alpha, YtY) previousProducts.unpersist() logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations)) products.setName(s"products-$iter").persist() val XtX = Some(sc.broadcast(computeYtY(products))) val previousUsers = users - users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda, - alpha, XtX) + users = updateFeatures(numUserBlocks, products, productOutLinks, userInLinks, + productPartitioner, rank, lambda, alpha, XtX) previousUsers.unpersist() } } else { for (iter <- 1 to iterations) { // perform ALS update logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations)) - products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda, - alpha, YtY = None) + products = updateFeatures(numProductBlocks, users, userOutLinks, productInLinks, + userPartitioner, rank, lambda, alpha, YtY = None) products.setName(s"products-$iter") logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations)) - users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda, - alpha, YtY = None) + users = updateFeatures(numUserBlocks, products, productOutLinks, userInLinks, + productPartitioner, rank, lambda, alpha, YtY = None) users.setName(s"users-$iter") } } @@ -340,9 +359,10 @@ class ALS private ( /** * Flatten out blocked user or product factors into an RDD of (id, factor vector) pairs */ - private def unblockFactors(blockedFactors: RDD[(Int, Array[Array[Double]])], - outLinks: RDD[(Int, OutLinkBlock)]) = { - blockedFactors.join(outLinks).flatMap{ case (b, (factors, outLinkBlock)) => + private def unblockFactors( + blockedFactors: RDD[(Int, Array[Array[Double]])], + outLinks: RDD[(Int, OutLinkBlock)]): RDD[(Int, Array[Double])] = { + blockedFactors.join(outLinks).flatMap { case (b, (factors, outLinkBlock)) => for (i <- 0 until factors.length) yield (outLinkBlock.elementIds(i), factors(i)) } } @@ -351,14 +371,14 @@ class ALS private ( * Make the out-links table for a block of the users (or products) dataset given the list of * (user, product, rating) values for the users in that block (or the opposite for products). */ - private def makeOutLinkBlock(numBlocks: Int, ratings: Array[Rating], - partitioner: Partitioner): OutLinkBlock = { + private def makeOutLinkBlock(numProductBlocks: Int, ratings: Array[Rating], + productPartitioner: Partitioner): OutLinkBlock = { val userIds = ratings.map(_.user).distinct.sorted val numUsers = userIds.length val userIdToPos = userIds.zipWithIndex.toMap - val shouldSend = Array.fill(numUsers)(new BitSet(numBlocks)) + val shouldSend = Array.fill(numUsers)(new BitSet(numProductBlocks)) for (r <- ratings) { - shouldSend(userIdToPos(r.user))(partitioner.getPartition(r.product)) = true + shouldSend(userIdToPos(r.user))(productPartitioner.getPartition(r.product)) = true } OutLinkBlock(userIds, shouldSend) } @@ -367,18 +387,17 @@ class ALS private ( * Make the in-links table for a block of the users (or products) dataset given a list of * (user, product, rating) values for the users in that block (or the opposite for products). */ - private def makeInLinkBlock(numBlocks: Int, ratings: Array[Rating], - partitioner: Partitioner): InLinkBlock = { + private def makeInLinkBlock(numProductBlocks: Int, ratings: Array[Rating], + productPartitioner: Partitioner): InLinkBlock = { val userIds = ratings.map(_.user).distinct.sorted - val numUsers = userIds.length val userIdToPos = userIds.zipWithIndex.toMap // Split out our ratings by product block - val blockRatings = Array.fill(numBlocks)(new ArrayBuffer[Rating]) + val blockRatings = Array.fill(numProductBlocks)(new ArrayBuffer[Rating]) for (r <- ratings) { - blockRatings(partitioner.getPartition(r.product)) += r + blockRatings(productPartitioner.getPartition(r.product)) += r } - val ratingsForBlock = new Array[Array[(Array[Int], Array[Double])]](numBlocks) - for (productBlock <- 0 until numBlocks) { + val ratingsForBlock = new Array[Array[(Array[Int], Array[Double])]](numProductBlocks) + for (productBlock <- 0 until numProductBlocks) { // Create an array of (product, Seq(Rating)) ratings val groupedRatings = blockRatings(productBlock).groupBy(_.product).toArray // Sort them by product ID @@ -400,14 +419,16 @@ class ALS private ( * the users (or (blockId, (p, u, r)) for the products). We create these simultaneously to avoid * having to shuffle the (blockId, (u, p, r)) RDD twice, or to cache it. */ - private def makeLinkRDDs(numBlocks: Int, ratings: RDD[(Int, Rating)], partitioner: Partitioner) - : (RDD[(Int, InLinkBlock)], RDD[(Int, OutLinkBlock)]) = - { - val grouped = ratings.partitionBy(new HashPartitioner(numBlocks)) + private def makeLinkRDDs( + numUserBlocks: Int, + numProductBlocks: Int, + ratingsByUserBlock: RDD[(Int, Rating)], + productPartitioner: Partitioner): (RDD[(Int, InLinkBlock)], RDD[(Int, OutLinkBlock)]) = { + val grouped = ratingsByUserBlock.partitionBy(new HashPartitioner(numUserBlocks)) val links = grouped.mapPartitionsWithIndex((blockId, elements) => { - val ratings = elements.map{_._2}.toArray - val inLinkBlock = makeInLinkBlock(numBlocks, ratings, partitioner) - val outLinkBlock = makeOutLinkBlock(numBlocks, ratings, partitioner) + val ratings = elements.map(_._2).toArray + val inLinkBlock = makeInLinkBlock(numProductBlocks, ratings, productPartitioner) + val outLinkBlock = makeOutLinkBlock(numProductBlocks, ratings, productPartitioner) Iterator.single((blockId, (inLinkBlock, outLinkBlock))) }, true) val inLinks = links.mapValues(_._1) @@ -439,26 +460,24 @@ class ALS private ( * It returns an RDD of new feature vectors for each user block. */ private def updateFeatures( + numUserBlocks: Int, products: RDD[(Int, Array[Array[Double]])], productOutLinks: RDD[(Int, OutLinkBlock)], userInLinks: RDD[(Int, InLinkBlock)], - partitioner: Partitioner, + productPartitioner: Partitioner, rank: Int, lambda: Double, alpha: Double, - YtY: Option[Broadcast[DoubleMatrix]]) - : RDD[(Int, Array[Array[Double]])] = - { - val numBlocks = products.partitions.size + YtY: Option[Broadcast[DoubleMatrix]]): RDD[(Int, Array[Array[Double]])] = { productOutLinks.join(products).flatMap { case (bid, (outLinkBlock, factors)) => - val toSend = Array.fill(numBlocks)(new ArrayBuffer[Array[Double]]) - for (p <- 0 until outLinkBlock.elementIds.length; userBlock <- 0 until numBlocks) { + val toSend = Array.fill(numUserBlocks)(new ArrayBuffer[Array[Double]]) + for (p <- 0 until outLinkBlock.elementIds.length; userBlock <- 0 until numUserBlocks) { if (outLinkBlock.shouldSend(p)(userBlock)) { toSend(userBlock) += factors(p) } } toSend.zipWithIndex.map{ case (buf, idx) => (idx, (bid, buf.toArray)) } - }.groupByKey(partitioner) + }.groupByKey(productPartitioner) .join(userInLinks) .mapValues{ case (messages, inLinkBlock) => updateBlock(messages, inLinkBlock, rank, lambda, alpha, YtY) @@ -475,7 +494,7 @@ class ALS private ( { // Sort the incoming block factor messages by block ID and make them an array val blockFactors = messages.toSeq.sortBy(_._1).map(_._2).toArray // Array[Array[Double]] - val numBlocks = blockFactors.length + val numProductBlocks = blockFactors.length val numUsers = inLinkBlock.elementIds.length // We'll sum up the XtXes using vectors that represent only the lower-triangular part, since @@ -490,7 +509,7 @@ class ALS private ( // Compute the XtX and Xy values for each user by adding products it rated in each product // block - for (productBlock <- 0 until numBlocks) { + for (productBlock <- 0 until numProductBlocks) { var p = 0 while (p < blockFactors(productBlock).length) { val x = wrapDoubleArray(blockFactors(productBlock)(p)) @@ -579,6 +598,23 @@ class ALS private ( } } +/** + * Partitioner for ALS. + */ +private[recommendation] class ALSPartitioner(override val numPartitions: Int) extends Partitioner { + override def getPartition(key: Any): Int = { + Utils.nonNegativeMod(byteswap32(key.asInstanceOf[Int]), numPartitions) + } + + override def equals(obj: Any): Boolean = { + obj match { + case p: ALSPartitioner => + this.numPartitions == p.numPartitions + case _ => + false + } + } +} /** * Top-level methods for calling Alternating Least Squares (ALS) matrix factorization. @@ -606,7 +642,7 @@ object ALS { blocks: Int, seed: Long ): MatrixFactorizationModel = { - new ALS(blocks, rank, iterations, lambda, false, 1.0, seed).run(ratings) + new ALS(blocks, blocks, rank, iterations, lambda, false, 1.0, seed).run(ratings) } /** @@ -629,7 +665,7 @@ object ALS { lambda: Double, blocks: Int ): MatrixFactorizationModel = { - new ALS(blocks, rank, iterations, lambda, false, 1.0).run(ratings) + new ALS(blocks, blocks, rank, iterations, lambda, false, 1.0).run(ratings) } /** @@ -689,7 +725,7 @@ object ALS { alpha: Double, seed: Long ): MatrixFactorizationModel = { - new ALS(blocks, rank, iterations, lambda, true, alpha, seed).run(ratings) + new ALS(blocks, blocks, rank, iterations, lambda, true, alpha, seed).run(ratings) } /** @@ -714,7 +750,7 @@ object ALS { blocks: Int, alpha: Double ): MatrixFactorizationModel = { - new ALS(blocks, rank, iterations, lambda, true, alpha).run(ratings) + new ALS(blocks, blocks, rank, iterations, lambda, true, alpha).run(ratings) } /** diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala index 37c9b9d085..81bebec8c7 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala @@ -121,6 +121,10 @@ class ALSSuite extends FunSuite with LocalSparkContext { testALS(100, 200, 2, 15, 0.7, 0.4, true, false, true) } + test("rank-2 matrices with different user and product blocks") { + testALS(100, 200, 2, 15, 0.7, 0.4, numUserBlocks = 4, numProductBlocks = 2) + } + test("pseudorandomness") { val ratings = sc.parallelize(ALSSuite.generateRatings(10, 20, 5, 0.5, false, false)._1, 2) val model11 = ALS.train(ratings, 5, 1, 1.0, 2, 1) @@ -153,35 +157,52 @@ class ALSSuite extends FunSuite with LocalSparkContext { } test("NNALS, rank 2") { - testALS(100, 200, 2, 15, 0.7, 0.4, false, false, false, -1, false) + testALS(100, 200, 2, 15, 0.7, 0.4, false, false, false, -1, -1, false) } /** * Test if we can correctly factorize R = U * P where U and P are of known rank. * - * @param users number of users - * @param products number of products - * @param features number of features (rank of problem) - * @param iterations number of iterations to run - * @param samplingRate what fraction of the user-product pairs are known + * @param users number of users + * @param products number of products + * @param features number of features (rank of problem) + * @param iterations number of iterations to run + * @param samplingRate what fraction of the user-product pairs are known * @param matchThreshold max difference allowed to consider a predicted rating correct - * @param implicitPrefs flag to test implicit feedback - * @param bulkPredict flag to test bulk prediciton + * @param implicitPrefs flag to test implicit feedback + * @param bulkPredict flag to test bulk prediciton * @param negativeWeights whether the generated data can contain negative values - * @param numBlocks number of blocks to partition users and products into + * @param numUserBlocks number of user blocks to partition users into + * @param numProductBlocks number of product blocks to partition products into * @param negativeFactors whether the generated user/product factors can have negative entries */ - def testALS(users: Int, products: Int, features: Int, iterations: Int, - samplingRate: Double, matchThreshold: Double, implicitPrefs: Boolean = false, - bulkPredict: Boolean = false, negativeWeights: Boolean = false, numBlocks: Int = -1, - negativeFactors: Boolean = true) - { + def testALS( + users: Int, + products: Int, + features: Int, + iterations: Int, + samplingRate: Double, + matchThreshold: Double, + implicitPrefs: Boolean = false, + bulkPredict: Boolean = false, + negativeWeights: Boolean = false, + numUserBlocks: Int = -1, + numProductBlocks: Int = -1, + negativeFactors: Boolean = true) { val (sampledRatings, trueRatings, truePrefs) = ALSSuite.generateRatings(users, products, features, samplingRate, implicitPrefs, negativeWeights, negativeFactors) - val model = (new ALS().setBlocks(numBlocks).setRank(features).setIterations(iterations) - .setAlpha(1.0).setImplicitPrefs(implicitPrefs).setLambda(0.01).setSeed(0L) - .setNonnegative(!negativeFactors).run(sc.parallelize(sampledRatings))) + val model = new ALS() + .setUserBlocks(numUserBlocks) + .setProductBlocks(numProductBlocks) + .setRank(features) + .setIterations(iterations) + .setAlpha(1.0) + .setImplicitPrefs(implicitPrefs) + .setLambda(0.01) + .setSeed(0L) + .setNonnegative(!negativeFactors) + .run(sc.parallelize(sampledRatings)) val predictedU = new DoubleMatrix(users, features) for ((u, vec) <- model.userFeatures.collect(); i <- 0 until features) { @@ -208,8 +229,9 @@ class ALSSuite extends FunSuite with LocalSparkContext { val prediction = predictedRatings.get(u, p) val correct = trueRatings.get(u, p) if (math.abs(prediction - correct) > matchThreshold) { - fail("Model failed to predict (%d, %d): %f vs %f\ncorr: %s\npred: %s\nU: %s\n P: %s".format( - u, p, correct, prediction, trueRatings, predictedRatings, predictedU, predictedP)) + fail(("Model failed to predict (%d, %d): %f vs %f\ncorr: %s\npred: %s\nU: %s\n P: %s") + .format(u, p, correct, prediction, trueRatings, predictedRatings, predictedU, + predictedP)) } } } else { diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index dd7efceb23..c80ab9a9f8 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -54,6 +54,14 @@ object MimaExcludes { ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.api.java.JavaDoubleRDD.countApproxDistinct$default$1") ) ++ + Seq( // Ignore some private methods in ALS. + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$^dateFeatures"), + ProblemFilters.exclude[MissingMethodProblem]( // The only public constructor is the one without arguments. + "org.apache.spark.mllib.recommendation.ALS.this"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$$default$7") + ) ++ MimaBuild.excludeSparkClass("rdd.ZippedRDD") ++ MimaBuild.excludeSparkClass("rdd.ZippedPartition") ++ MimaBuild.excludeSparkClass("util.SerializableHyperLogLog") -- cgit v1.2.3