aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
Diffstat (limited to 'mllib')
-rw-r--r--mllib/pom.xml105
-rw-r--r--mllib/src/main/scala/spark/mllib/recommendation/ALS.scala32
2 files changed, 29 insertions, 108 deletions
diff --git a/mllib/pom.xml b/mllib/pom.xml
index a07480fbe2..ab31d5734e 100644
--- a/mllib/pom.xml
+++ b/mllib/pom.xml
@@ -33,6 +33,11 @@
<dependencies>
<dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</dependency>
@@ -41,7 +46,6 @@
<artifactId>jblas</artifactId>
<version>1.2.3</version>
</dependency>
-
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.version}</artifactId>
@@ -68,103 +72,4 @@
</plugin>
</plugins>
</build>
-
- <profiles>
- <profile>
- <id>hadoop1</id>
- <dependencies>
- <dependency>
- <groupId>org.spark-project</groupId>
- <artifactId>spark-core</artifactId>
- <version>${project.version}</version>
- <classifier>hadoop1</classifier>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <scope>provided</scope>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <configuration>
- <classifier>hadoop1</classifier>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </profile>
- <profile>
- <id>hadoop2</id>
- <dependencies>
- <dependency>
- <groupId>org.spark-project</groupId>
- <artifactId>spark-core</artifactId>
- <version>${project.version}</version>
- <classifier>hadoop2</classifier>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <scope>provided</scope>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <configuration>
- <classifier>hadoop2</classifier>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </profile>
- <profile>
- <id>hadoop2-yarn</id>
- <dependencies>
- <dependency>
- <groupId>org.spark-project</groupId>
- <artifactId>spark-core</artifactId>
- <version>${project.version}</version>
- <classifier>hadoop2-yarn</classifier>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-api</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-common</artifactId>
- <scope>provided</scope>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <configuration>
- <classifier>hadoop2-yarn</classifier>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </profile>
- </profiles>
</project>
diff --git a/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala
index 6c71dc1f32..dbfbf59975 100644
--- a/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala
@@ -123,10 +123,28 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
val (userInLinks, userOutLinks) = makeLinkRDDs(numBlocks, ratingsByUserBlock)
val (productInLinks, productOutLinks) = makeLinkRDDs(numBlocks, ratingsByProductBlock)
- // Initialize user and product factors randomly
- val seed = new Random().nextInt()
- var users = userOutLinks.mapValues(_.elementIds.map(u => randomFactor(rank, seed ^ u)))
- var products = productOutLinks.mapValues(_.elementIds.map(p => randomFactor(rank, seed ^ ~p)))
+ // Initialize user and product factors randomly, but use a deterministic seed for each partition
+ // so that fault recovery works
+ val seedGen = new Random()
+ val seed1 = seedGen.nextInt()
+ val seed2 = seedGen.nextInt()
+ // Hash an integer to propagate random bits at all positions, similar to java.util.HashTable
+ def hash(x: Int): Int = {
+ val r = x ^ (x >>> 20) ^ (x >>> 12)
+ r ^ (r >>> 7) ^ (r >>> 4)
+ }
+ var users = userOutLinks.mapPartitionsWithIndex { (index, itr) =>
+ val rand = new Random(hash(seed1 ^ index))
+ itr.map { case (x, y) =>
+ (x, y.elementIds.map(_ => randomFactor(rank, rand)))
+ }
+ }
+ var products = productOutLinks.mapPartitionsWithIndex { (index, itr) =>
+ val rand = new Random(hash(seed2 ^ index))
+ itr.map { case (x, y) =>
+ (x, y.elementIds.map(_ => randomFactor(rank, rand)))
+ }
+ }
for (iter <- 0 until iterations) {
// perform ALS update
@@ -213,11 +231,9 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
}
/**
- * Make a random factor vector with the given seed.
- * TODO: Initialize things using mapPartitionsWithIndex to make it faster?
+ * Make a random factor vector with the given random.
*/
- private def randomFactor(rank: Int, seed: Int): Array[Double] = {
- val rand = new Random(seed)
+ private def randomFactor(rank: Int, rand: Random): Array[Double] = {
Array.fill(rank)(rand.nextDouble)
}