aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorShivaram Venkataraman <shivaram@eecs.berkeley.edu>2013-08-06 21:21:55 -0700
committerShivaram Venkataraman <shivaram@eecs.berkeley.edu>2013-08-06 21:21:55 -0700
commit338b7a7455c02371890590fb71eefaee587f9d0e (patch)
tree7e32093a00f87ff26239209ac2d006c5a8207ae4 /mllib
parent7db69d56f2d050842ecf6e465d2d4f1abf3314d7 (diff)
parent7c4b7a53b1b588c1d0d3e00e99d4d7c53dc1da3d (diff)
downloadspark-338b7a7455c02371890590fb71eefaee587f9d0e.tar.gz
spark-338b7a7455c02371890590fb71eefaee587f9d0e.tar.bz2
spark-338b7a7455c02371890590fb71eefaee587f9d0e.zip
Merge branch 'master' of git://github.com/mesos/spark into sgd-cleanup
Conflicts: mllib/src/main/scala/spark/mllib/util/MLUtils.scala
Diffstat (limited to 'mllib')
-rw-r--r--mllib/pom.xml165
-rw-r--r--mllib/src/main/scala/spark/mllib/clustering/KMeans.scala9
-rw-r--r--mllib/src/main/scala/spark/mllib/recommendation/ALS.scala1
-rw-r--r--mllib/src/main/scala/spark/mllib/util/KMeansDataGenerator.scala80
-rw-r--r--mllib/src/main/scala/spark/mllib/util/MFDataGenerator.scala113
-rw-r--r--mllib/src/main/scala/spark/mllib/util/MLUtils.scala4
6 files changed, 366 insertions, 6 deletions
diff --git a/mllib/pom.xml b/mllib/pom.xml
new file mode 100644
index 0000000000..f3928cc73d
--- /dev/null
+++ b/mllib/pom.xml
@@ -0,0 +1,165 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-parent</artifactId>
+ <version>0.8.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-mllib</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Project ML Library</name>
+ <url>http://spark-project.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jblas</groupId>
+ <artifactId>jblas</artifactId>
+ <version>1.2.3</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalacheck</groupId>
+ <artifactId>scalacheck_${scala.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>hadoop1</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-core</artifactId>
+ <version>${project.version}</version>
+ <classifier>hadoop1</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <classifier>hadoop1</classifier>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>hadoop2</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-core</artifactId>
+ <version>${project.version}</version>
+ <classifier>hadoop2</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <classifier>hadoop2</classifier>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>hadoop2-yarn</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-core</artifactId>
+ <version>${project.version}</version>
+ <classifier>hadoop2-yarn</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <classifier>hadoop2-yarn</classifier>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+</project>
diff --git a/mllib/src/main/scala/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/spark/mllib/clustering/KMeans.scala
index d875d6de50..b402c71ed2 100644
--- a/mllib/src/main/scala/spark/mllib/clustering/KMeans.scala
+++ b/mllib/src/main/scala/spark/mllib/clustering/KMeans.scala
@@ -315,14 +315,15 @@ object KMeans {
}
def main(args: Array[String]) {
- if (args.length != 4) {
- println("Usage: KMeans <master> <input_file> <k> <max_iterations>")
+ if (args.length < 4) {
+ println("Usage: KMeans <master> <input_file> <k> <max_iterations> [<runs>]")
System.exit(1)
}
val (master, inputFile, k, iters) = (args(0), args(1), args(2).toInt, args(3).toInt)
+ val runs = if (args.length >= 5) args(4).toInt else 1
val sc = new SparkContext(master, "KMeans")
- val data = sc.textFile(inputFile).map(line => line.split(' ').map(_.toDouble))
- val model = KMeans.train(data, k, iters)
+ val data = sc.textFile(inputFile).map(line => line.split(' ').map(_.toDouble)).cache()
+ val model = KMeans.train(data, k, iters, runs)
val cost = model.computeCost(data)
println("Cluster centers:")
for (c <- model.clusterCenters) {
diff --git a/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala
index 7281b2fcb9..6ecf0151a1 100644
--- a/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala
@@ -418,6 +418,7 @@ object ALS {
System.setProperty("spark.serializer", "spark.KryoSerializer")
System.setProperty("spark.kryo.registrator", classOf[ALSRegistrator].getName)
System.setProperty("spark.kryo.referenceTracking", "false")
+ System.setProperty("spark.kryoserializer.buffer.mb", "8")
System.setProperty("spark.locality.wait", "10000")
val sc = new SparkContext(master, "ALS")
val ratings = sc.textFile(ratingsFile).map { line =>
diff --git a/mllib/src/main/scala/spark/mllib/util/KMeansDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/KMeansDataGenerator.scala
new file mode 100644
index 0000000000..8f95cf7479
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/util/KMeansDataGenerator.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.util
+
+import scala.util.Random
+
+import spark.{RDD, SparkContext}
+
+object KMeansDataGenerator {
+
+ /**
+ * Generate an RDD containing test data for KMeans. This function chooses k cluster centers
+ * from a d-dimensional Gaussian distribution scaled by factor r, then creates a Gaussian
+ * cluster with scale 1 around each center.
+ *
+ * @param sc SparkContext to use for creating the RDD
+ * @param numPoints Number of points that will be contained in the RDD
+ * @param k Number of clusters
+ * @param d Number of dimensions
+ * @parak r Scaling factor for the distribution of the initial centers
+ * @param numPartitions Number of partitions of the generated RDD; default 2
+ */
+ def generateKMeansRDD(
+ sc: SparkContext,
+ numPoints: Int,
+ k: Int,
+ d: Int,
+ r: Double,
+ numPartitions: Int = 2)
+ : RDD[Array[Double]] =
+ {
+ // First, generate some centers
+ val rand = new Random(42)
+ val centers = Array.fill(k)(Array.fill(d)(rand.nextGaussian() * r))
+ // Then generate points around each center
+ sc.parallelize(0 until numPoints, numPartitions).map { idx =>
+ val center = centers(idx % k)
+ val rand2 = new Random(42 + idx)
+ Array.tabulate(d)(i => center(i) + rand2.nextGaussian())
+ }
+ }
+
+ def main(args: Array[String]) {
+ if (args.length < 6) {
+ println("Usage: KMeansGenerator " +
+ "<master> <output_dir> <num_points> <k> <d> <r> [<num_partitions>]")
+ System.exit(1)
+ }
+
+ val sparkMaster = args(0)
+ val outputPath = args(1)
+ val numPoints = args(2).toInt
+ val k = args(3).toInt
+ val d = args(4).toInt
+ val r = args(5).toDouble
+ val parts = if (args.length >= 7) args(6).toInt else 2
+
+ val sc = new SparkContext(sparkMaster, "KMeansDataGenerator")
+ val data = generateKMeansRDD(sc, numPoints, k, d, r, parts)
+ data.map(_.mkString(" ")).saveAsTextFile(outputPath)
+
+ System.exit(0)
+ }
+}
+
diff --git a/mllib/src/main/scala/spark/mllib/util/MFDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/MFDataGenerator.scala
new file mode 100644
index 0000000000..88992cde0c
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/util/MFDataGenerator.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.recommendation
+
+import scala.util.Random
+
+import org.jblas.DoubleMatrix
+
+import spark.{RDD, SparkContext}
+import spark.mllib.util.MLUtils
+
+/**
+* Generate RDD(s) containing data for Matrix Factorization.
+*
+* This method samples training entries according to the oversampling factor
+* 'trainSampFact', which is a multiplicative factor of the number of
+* degrees of freedom of the matrix: rank*(m+n-rank).
+*
+* It optionally samples entries for a testing matrix using
+* 'testSampFact', the percentage of the number of training entries
+* to use for testing.
+*
+* This method takes the following inputs:
+* sparkMaster (String) The master URL.
+* outputPath (String) Directory to save output.
+* m (Int) Number of rows in data matrix.
+* n (Int) Number of columns in data matrix.
+* rank (Int) Underlying rank of data matrix.
+* trainSampFact (Double) Oversampling factor.
+* noise (Boolean) Whether to add gaussian noise to training data.
+* sigma (Double) Standard deviation of added gaussian noise.
+* test (Boolean) Whether to create testing RDD.
+* testSampFact (Double) Percentage of training data to use as test data.
+*/
+
+object MFDataGenerator{
+
+ def main(args: Array[String]) {
+ if (args.length < 2) {
+ println("Usage: MFDataGenerator " +
+ "<master> <outputDir> [m] [n] [rank] [trainSampFact] [noise] [sigma] [test] [testSampFact]")
+ System.exit(1)
+ }
+
+ val sparkMaster: String = args(0)
+ val outputPath: String = args(1)
+ val m: Int = if (args.length > 2) args(2).toInt else 100
+ val n: Int = if (args.length > 3) args(3).toInt else 100
+ val rank: Int = if (args.length > 4) args(4).toInt else 10
+ val trainSampFact: Double = if (args.length > 5) args(5).toDouble else 1.0
+ val noise: Boolean = if (args.length > 6) args(6).toBoolean else false
+ val sigma: Double = if (args.length > 7) args(7).toDouble else 0.1
+ val test: Boolean = if (args.length > 8) args(8).toBoolean else false
+ val testSampFact: Double = if (args.length > 9) args(9).toDouble else 0.1
+
+ val sc = new SparkContext(sparkMaster, "MFDataGenerator")
+
+ val A = DoubleMatrix.randn(m, rank)
+ val B = DoubleMatrix.randn(rank, n)
+ val z = 1 / (scala.math.sqrt(scala.math.sqrt(rank)))
+ A.mmuli(z)
+ B.mmuli(z)
+ val fullData = A.mmul(B)
+
+ val df = rank * (m + n - rank)
+ val sampSize = scala.math.min(scala.math.round(trainSampFact * df),
+ scala.math.round(.99 * m * n)).toInt
+ val rand = new Random()
+ val mn = m * n
+ val shuffled = rand.shuffle(1 to mn toIterable)
+
+ val omega = shuffled.slice(0, sampSize)
+ val ordered = omega.sortWith(_ < _).toArray
+ val trainData: RDD[(Int, Int, Double)] = sc.parallelize(ordered)
+ .map(x => (fullData.indexRows(x - 1), fullData.indexColumns(x - 1), fullData.get(x - 1)))
+
+ // optionally add gaussian noise
+ if (noise) {
+ trainData.map(x => (x._1, x._2, x._3 + rand.nextGaussian * sigma))
+ }
+
+ trainData.map(x => x._1 + "," + x._2 + "," + x._3).saveAsTextFile(outputPath)
+
+ // optionally generate testing data
+ if (test) {
+ val testSampSize = scala.math
+ .min(scala.math.round(sampSize * testSampFact),scala.math.round(mn - sampSize)).toInt
+ val testOmega = shuffled.slice(sampSize, sampSize + testSampSize)
+ val testOrdered = testOmega.sortWith(_ < _).toArray
+ val testData: RDD[(Int, Int, Double)] = sc.parallelize(testOrdered)
+ .map(x => (fullData.indexRows(x - 1), fullData.indexColumns(x - 1), fullData.get(x - 1)))
+ testData.map(x => x._1 + "," + x._2 + "," + x._3).saveAsTextFile(outputPath)
+ }
+
+ sc.stop()
+
+ }
+} \ No newline at end of file
diff --git a/mllib/src/main/scala/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/spark/mllib/util/MLUtils.scala
index e45eda2c99..9174e8cea7 100644
--- a/mllib/src/main/scala/spark/mllib/util/MLUtils.scala
+++ b/mllib/src/main/scala/spark/mllib/util/MLUtils.scala
@@ -39,9 +39,9 @@ object MLUtils {
*/
def loadLabeledData(sc: SparkContext, dir: String): RDD[LabeledPoint] = {
sc.textFile(dir).map { line =>
- val parts = line.split(",")
+ val parts = line.split(',')
val label = parts(0).toDouble
- val features = parts(1).trim().split(" ").map(_.toDouble)
+ val features = parts(1).trim().split(' ').map(_.toDouble)
LabeledPoint(label, features)
}
}