aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore1
-rw-r--r--README.md16
-rw-r--r--assembly/lib/PY4J_LICENSE.txt27
-rw-r--r--assembly/lib/PY4J_VERSION.txt1
-rw-r--r--assembly/lib/net/sf/py4j/py4j/0.7/py4j-0.7.jar (renamed from python/lib/py4j0.7.jar)bin103286 -> 103286 bytes
-rw-r--r--assembly/lib/net/sf/py4j/py4j/0.7/py4j-0.7.pom9
-rw-r--r--assembly/lib/net/sf/py4j/py4j/maven-metadata-local.xml12
-rw-r--r--assembly/pom.xml138
-rw-r--r--assembly/src/main/assembly/assembly.xml19
-rwxr-xr-xbin/compute-classpath.sh82
-rwxr-xr-xbin/spark-daemon.sh8
-rw-r--r--core/src/main/scala/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala1
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala22
-rw-r--r--core/src/main/scala/spark/rdd/HadoopRDD.scala4
-rw-r--r--core/src/main/scala/spark/rdd/NewHadoopRDD.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala4
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMasterActor.scala4
-rw-r--r--core/src/main/scala/spark/ui/UIWorkloadGenerator.scala2
-rw-r--r--core/src/test/scala/spark/DriverSuite.scala2
-rw-r--r--docs/bagel-programming-guide.md2
-rw-r--r--docs/building-with-maven.md59
-rw-r--r--docs/index.md6
-rw-r--r--docs/java-programming-guide.md2
-rw-r--r--docs/python-programming-guide.md2
-rw-r--r--docs/quick-start.md2
-rw-r--r--docs/running-on-yarn.md18
-rw-r--r--docs/scala-programming-guide.md2
-rw-r--r--docs/spark-standalone.md4
-rw-r--r--docs/streaming-programming-guide.md4
-rw-r--r--examples/pom.xml67
-rw-r--r--examples/src/main/scala/spark/examples/bagel/PageRankUtils.scala (renamed from bagel/src/main/scala/spark/bagel/examples/PageRankUtils.scala)2
-rw-r--r--examples/src/main/scala/spark/examples/bagel/WikipediaPageRank.scala (renamed from bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala)2
-rw-r--r--examples/src/main/scala/spark/examples/bagel/WikipediaPageRankStandalone.scala (renamed from bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala)2
-rw-r--r--examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala4
-rw-r--r--examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala2
-rw-r--r--examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala2
-rw-r--r--examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala2
-rw-r--r--examples/src/main/scala/spark/streaming/examples/StatefulNetworkWordCount.scala2
-rw-r--r--examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala4
-rw-r--r--examples/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala4
-rw-r--r--examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala6
-rwxr-xr-xmake-distribution.sh22
-rw-r--r--mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala31
-rw-r--r--mllib/src/main/scala/spark/mllib/classification/SVM.scala33
-rw-r--r--mllib/src/main/scala/spark/mllib/optimization/Gradient.scala16
-rw-r--r--mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala23
-rw-r--r--mllib/src/main/scala/spark/mllib/regression/Lasso.scala54
-rw-r--r--mllib/src/main/scala/spark/mllib/regression/LinearRegression.scala167
-rw-r--r--mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala288
-rw-r--r--mllib/src/main/scala/spark/mllib/util/DataValidators.scala42
-rw-r--r--mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala48
-rw-r--r--mllib/src/main/scala/spark/mllib/util/LinearDataGenerator.scala (renamed from mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala)91
-rw-r--r--mllib/src/main/scala/spark/mllib/util/MLUtils.scala10
-rw-r--r--mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala9
-rw-r--r--mllib/src/test/java/spark/mllib/regression/JavaLassoSuite.java19
-rw-r--r--mllib/src/test/java/spark/mllib/regression/JavaLinearRegressionSuite.java94
-rw-r--r--mllib/src/test/java/spark/mllib/regression/JavaRidgeRegressionSuite.java110
-rw-r--r--mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala45
-rw-r--r--mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala39
-rw-r--r--mllib/src/test/scala/spark/mllib/regression/LinearRegressionSuite.scala72
-rw-r--r--mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala64
-rw-r--r--pom.xml18
-rw-r--r--project/SparkBuild.scala56
-rw-r--r--project/build.properties2
-rw-r--r--project/plugins.sbt2
-rwxr-xr-xpyspark19
-rw-r--r--python/pyspark/java_gateway.py2
-rw-r--r--python/pyspark/rdd.py37
-rwxr-xr-xrun-example81
-rwxr-xr-xsbt/sbt2
-rwxr-xr-xspark-class (renamed from run)93
-rwxr-xr-xspark-executor2
-rwxr-xr-xspark-shell3
-rw-r--r--yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala18
-rw-r--r--yarn/src/main/scala/spark/deploy/yarn/Client.scala8
-rw-r--r--yarn/src/main/scala/spark/deploy/yarn/WorkerRunnable.scala9
77 files changed, 1523 insertions, 663 deletions
diff --git a/.gitignore b/.gitignore
index 00fbff6a2c..e1f64a1133 100644
--- a/.gitignore
+++ b/.gitignore
@@ -40,3 +40,4 @@ checkpoint
derby.log
dist/
spark-*-bin.tar.gz
+unit-tests.log
diff --git a/README.md b/README.md
index e5f527b84a..89b5a0abfd 100644
--- a/README.md
+++ b/README.md
@@ -16,20 +16,20 @@ Spark requires Scala 2.9.3 (Scala 2.10 is not yet supported). The project is
built using Simple Build Tool (SBT), which is packaged with it. To build
Spark and its example programs, run:
- sbt/sbt package assembly
+ sbt/sbt assembly
Spark also supports building using Maven. If you would like to build using Maven,
see the [instructions for building Spark with Maven](http://spark-project.org/docs/latest/building-with-maven.html)
-in the spark documentation..
+in the Spark documentation..
To run Spark, you will need to have Scala's bin directory in your `PATH`, or
you will need to set the `SCALA_HOME` environment variable to point to where
you've installed Scala. Scala must be accessible through one of these
methods on your cluster's worker nodes as well as its master.
-To run one of the examples, use `./run <class> <params>`. For example:
+To run one of the examples, use `./run-example <class> <params>`. For example:
- ./run spark.examples.SparkLR local[2]
+ ./run-example spark.examples.SparkLR local[2]
will run the Logistic Regression example locally on 2 CPUs.
@@ -52,19 +52,19 @@ For Apache Hadoop versions 1.x, Cloudera CDH MRv1, and other Hadoop
versions without YARN, use:
# Apache Hadoop 1.2.1
- $ SPARK_HADOOP_VERSION=1.2.1 sbt/sbt package assembly
+ $ SPARK_HADOOP_VERSION=1.2.1 sbt/sbt assembly
# Cloudera CDH 4.2.0 with MapReduce v1
- $ SPARK_HADOOP_VERSION=2.0.0-mr1-cdh4.2.0 sbt/sbt package assembly
+ $ SPARK_HADOOP_VERSION=2.0.0-mr1-cdh4.2.0 sbt/sbt assembly
For Apache Hadoop 2.x, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions
with YARN, also set `SPARK_WITH_YARN=true`:
# Apache Hadoop 2.0.5-alpha
- $ SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_WITH_YARN=true sbt/sbt package assembly
+ $ SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_WITH_YARN=true sbt/sbt assembly
# Cloudera CDH 4.2.0 with MapReduce v2
- $ SPARK_HADOOP_VERSION=2.0.0-cdh4.2.0 SPARK_WITH_YARN=true sbt/sbt package assembly
+ $ SPARK_HADOOP_VERSION=2.0.0-cdh4.2.0 SPARK_WITH_YARN=true sbt/sbt assembly
For convenience, these variables may also be set through the `conf/spark-env.sh` file
described below.
diff --git a/assembly/lib/PY4J_LICENSE.txt b/assembly/lib/PY4J_LICENSE.txt
new file mode 100644
index 0000000000..a70279ca14
--- /dev/null
+++ b/assembly/lib/PY4J_LICENSE.txt
@@ -0,0 +1,27 @@
+
+Copyright (c) 2009-2011, Barthelemy Dagenais All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+- Redistributions of source code must retain the above copyright notice, this
+list of conditions and the following disclaimer.
+
+- Redistributions in binary form must reproduce the above copyright notice,
+this list of conditions and the following disclaimer in the documentation
+and/or other materials provided with the distribution.
+
+- The name of the author may not be used to endorse or promote products
+derived from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+POSSIBILITY OF SUCH DAMAGE.
diff --git a/assembly/lib/PY4J_VERSION.txt b/assembly/lib/PY4J_VERSION.txt
new file mode 100644
index 0000000000..04a0cd52a8
--- /dev/null
+++ b/assembly/lib/PY4J_VERSION.txt
@@ -0,0 +1 @@
+b7924aabe9c5e63f0a4d8bbd17019534c7ec014e
diff --git a/python/lib/py4j0.7.jar b/assembly/lib/net/sf/py4j/py4j/0.7/py4j-0.7.jar
index 73b7ddb7d1..73b7ddb7d1 100644
--- a/python/lib/py4j0.7.jar
+++ b/assembly/lib/net/sf/py4j/py4j/0.7/py4j-0.7.jar
Binary files differ
diff --git a/assembly/lib/net/sf/py4j/py4j/0.7/py4j-0.7.pom b/assembly/lib/net/sf/py4j/py4j/0.7/py4j-0.7.pom
new file mode 100644
index 0000000000..1c730e19b4
--- /dev/null
+++ b/assembly/lib/net/sf/py4j/py4j/0.7/py4j-0.7.pom
@@ -0,0 +1,9 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>net.sf.py4j</groupId>
+ <artifactId>py4j</artifactId>
+ <version>0.7</version>
+ <description>POM was created from install:install-file</description>
+</project>
diff --git a/assembly/lib/net/sf/py4j/py4j/maven-metadata-local.xml b/assembly/lib/net/sf/py4j/py4j/maven-metadata-local.xml
new file mode 100644
index 0000000000..6942ff45e7
--- /dev/null
+++ b/assembly/lib/net/sf/py4j/py4j/maven-metadata-local.xml
@@ -0,0 +1,12 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<metadata>
+ <groupId>net.sf.py4j</groupId>
+ <artifactId>py4j</artifactId>
+ <versioning>
+ <release>0.7</release>
+ <versions>
+ <version>0.7</version>
+ </versions>
+ <lastUpdated>20130828020333</lastUpdated>
+ </versioning>
+</metadata>
diff --git a/assembly/pom.xml b/assembly/pom.xml
index ca20ccadba..74990b6361 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -1,4 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
@@ -13,29 +30,13 @@
<name>Spark Project Assembly</name>
<url>http://spark-project.org/</url>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>2.4</version>
- <executions>
- <execution>
- <id>dist</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- <configuration>
- <descriptors>
- <descriptor>src/main/assembly/assembly.xml</descriptor>
- </descriptors>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
+ <repositories>
+ <!-- A repository in the local filesystem for the Py4J JAR, which is not in Maven central -->
+ <repository>
+ <id>lib</id>
+ <url>file://${project.basedir}/lib</url>
+ </repository>
+ </repositories>
<dependencies>
<dependency>
@@ -63,5 +64,96 @@
<artifactId>spark-streaming</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>net.sf.py4j</groupId>
+ <artifactId>py4j</artifactId>
+ <version>0.7</version>
+ </dependency>
</dependencies>
+
+ <build>
+ <plugins>
+ <!-- Use the shade plugin to create a big JAR with all the dependencies -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <configuration>
+ <shadedArtifactAttached>false</shadedArtifactAttached>
+ <outputFile>${project.build.directory}/scala-${scala.version}/${project.artifactId}-${project.version}-hadoop${hadoop.version}.jar</outputFile>
+ <artifactSet>
+ <includes>
+ <include>*:*</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+ <resource>reference.conf</resource>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>hadoop2-yarn</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-yarn</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
+ <id>bigtop-dist</id>
+ <!-- This profile uses the assembly plugin to create a special "dist" package for BigTop
+ that contains Spark but not the Hadoop JARs it depends on. -->
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.4</version>
+ <executions>
+ <execution>
+ <id>dist</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assembly/assembly.xml</descriptor>
+ </descriptors>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
</project>
diff --git a/assembly/src/main/assembly/assembly.xml b/assembly/src/main/assembly/assembly.xml
index 14485b7181..4543b52c93 100644
--- a/assembly/src/main/assembly/assembly.xml
+++ b/assembly/src/main/assembly/assembly.xml
@@ -1,3 +1,19 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
<assembly>
<id>dist</id>
<formats>
@@ -36,7 +52,8 @@
</directory>
<outputDirectory>/bin</outputDirectory>
<includes>
- <include>run*</include>
+ <include>run-example*</include>
+ <include>spark-class*</include>
<include>spark-shell*</include>
<include>spark-executor*</include>
</includes>
diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh
index 7a21b3c4a1..c7819d4932 100755
--- a/bin/compute-classpath.sh
+++ b/bin/compute-classpath.sh
@@ -30,79 +30,25 @@ if [ -e $FWDIR/conf/spark-env.sh ] ; then
. $FWDIR/conf/spark-env.sh
fi
-CORE_DIR="$FWDIR/core"
-REPL_DIR="$FWDIR/repl"
-REPL_BIN_DIR="$FWDIR/repl-bin"
-EXAMPLES_DIR="$FWDIR/examples"
-BAGEL_DIR="$FWDIR/bagel"
-MLLIB_DIR="$FWDIR/mllib"
-TOOLS_DIR="$FWDIR/tools"
-YARN_DIR="$FWDIR/yarn"
-STREAMING_DIR="$FWDIR/streaming"
-PYSPARK_DIR="$FWDIR/python"
-
# Build up classpath
-CLASSPATH="$SPARK_CLASSPATH"
-
-function dev_classpath {
- CLASSPATH="$CLASSPATH:$FWDIR/conf"
- CLASSPATH="$CLASSPATH:$CORE_DIR/target/scala-$SCALA_VERSION/classes"
- if [ -n "$SPARK_TESTING" ] ; then
- CLASSPATH="$CLASSPATH:$CORE_DIR/target/scala-$SCALA_VERSION/test-classes"
- CLASSPATH="$CLASSPATH:$STREAMING_DIR/target/scala-$SCALA_VERSION/test-classes"
- fi
- CLASSPATH="$CLASSPATH:$CORE_DIR/src/main/resources"
- CLASSPATH="$CLASSPATH:$REPL_DIR/target/scala-$SCALA_VERSION/classes"
- CLASSPATH="$CLASSPATH:$EXAMPLES_DIR/target/scala-$SCALA_VERSION/classes"
- CLASSPATH="$CLASSPATH:$STREAMING_DIR/target/scala-$SCALA_VERSION/classes"
- CLASSPATH="$CLASSPATH:$STREAMING_DIR/lib/org/apache/kafka/kafka/0.7.2-spark/*" # <-- our in-project Kafka Jar
- if [ -e "$FWDIR/lib_managed" ]; then
- CLASSPATH="$CLASSPATH:$FWDIR/lib_managed/jars/*"
- CLASSPATH="$CLASSPATH:$FWDIR/lib_managed/bundles/*"
- fi
- CLASSPATH="$CLASSPATH:$REPL_DIR/lib/*"
- # Add the shaded JAR for Maven builds
- if [ -e $REPL_BIN_DIR/target ]; then
- for jar in `find "$REPL_BIN_DIR/target" -name 'spark-repl-*-shaded.jar'`; do
- CLASSPATH="$CLASSPATH:$jar"
- done
- # The shaded JAR doesn't contain examples, so include those separately
- for jar in `find "$EXAMPLES_DIR/target" -name 'spark-examples*[0-9T].jar'`; do
- CLASSPATH="$CLASSPATH:$jar"
- done
- fi
- CLASSPATH="$CLASSPATH:$BAGEL_DIR/target/scala-$SCALA_VERSION/classes"
- CLASSPATH="$CLASSPATH:$MLLIB_DIR/target/scala-$SCALA_VERSION/classes"
- CLASSPATH="$CLASSPATH:$TOOLS_DIR/target/scala-$SCALA_VERSION/classes"
- CLASSPATH="$CLASSPATH:$YARN_DIR/target/scala-$SCALA_VERSION/classes"
- for jar in `find $PYSPARK_DIR/lib -name '*jar'`; do
- CLASSPATH="$CLASSPATH:$jar"
- done
-
- # Add Scala standard library
- if [ -z "$SCALA_LIBRARY_PATH" ]; then
- if [ -z "$SCALA_HOME" ]; then
- echo "SCALA_HOME is not set" >&2
- exit 1
- fi
- SCALA_LIBRARY_PATH="$SCALA_HOME/lib"
- fi
- CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/scala-library.jar"
- CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/scala-compiler.jar"
- CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/jline.jar"
-}
-
-function release_classpath {
- CLASSPATH="$CLASSPATH:$FWDIR/jars/*"
-}
-
+CLASSPATH="$SPARK_CLASSPATH:$FWDIR/conf"
if [ -f "$FWDIR/RELEASE" ]; then
- release_classpath
+ ASSEMBLY_JAR=`ls "$FWDIR"/jars/spark-assembly*.jar`
else
- dev_classpath
+ ASSEMBLY_JAR=`ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/spark-assembly*hadoop*.jar`
+fi
+CLASSPATH="$CLASSPATH:$ASSEMBLY_JAR"
+
+# Add test classes if we're running from SBT or Maven with SPARK_TESTING set to 1
+if [[ $SPARK_TESTING == 1 ]]; then
+ CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SCALA_VERSION/test-classes"
+ CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SCALA_VERSION/test-classes"
+ CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SCALA_VERSION/test-classes"
+ CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SCALA_VERSION/test-classes"
+ CLASSPATH="$CLASSPATH:$FWDIR/streaming/target/scala-$SCALA_VERSION/test-classes"
fi
-# Add hadoop conf dir - else FileSystem.*, etc fail !
+# Add hadoop conf dir if given -- otherwise FileSystem.*, etc fail !
# Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts
# the configurtion files.
if [ "x" != "x$HADOOP_CONF_DIR" ]; then
diff --git a/bin/spark-daemon.sh b/bin/spark-daemon.sh
index 96c71e66ca..eac0774669 100755
--- a/bin/spark-daemon.sh
+++ b/bin/spark-daemon.sh
@@ -87,7 +87,7 @@ TEST_LOG_DIR=$?
if [ "${TEST_LOG_DIR}" = "0" ]; then
rm -f $SPARK_LOG_DIR/.spark_test
else
- chown $SPARK_IDENT_STRING $SPARK_LOG_DIR
+ chown $SPARK_IDENT_STRING $SPARK_LOG_DIR
fi
if [ "$SPARK_PID_DIR" = "" ]; then
@@ -109,7 +109,7 @@ fi
case $startStop in
(start)
-
+
mkdir -p "$SPARK_PID_DIR"
if [ -f $pid ]; then
@@ -128,11 +128,11 @@ case $startStop in
echo starting $command, logging to $log
echo "Spark Daemon: $command" > $log
cd "$SPARK_PREFIX"
- nohup nice -n $SPARK_NICENESS "$SPARK_PREFIX"/run $command "$@" >> "$log" 2>&1 < /dev/null &
+ nohup nice -n $SPARK_NICENESS "$SPARK_PREFIX"/spark-class $command "$@" >> "$log" 2>&1 < /dev/null &
echo $! > $pid
sleep 1; head "$log"
;;
-
+
(stop)
if [ -f $pid ]; then
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 23dfbcd604..7ce9505b9c 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -102,7 +102,7 @@ class SparkContext(
System.setProperty("spark.driver.port", "0")
}
- private val isLocal = (master == "local" || master.startsWith("local["))
+ val isLocal = (master == "local" || master.startsWith("local["))
// Create the Spark execution environment (cache, map output tracker, etc)
private[spark] val env = SparkEnv.createFromSystemProperties(
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index 5e53d95ac2..34665ce451 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -153,6 +153,7 @@ private[spark] class ExecutorRunner(
// Launch the process
val command = buildCommandSeq()
+ logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
val builder = new ProcessBuilder(command: _*).directory(executorDir)
val env = builder.environment()
for ((key, value) <- appDesc.command.environment) {
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 036c7191ad..fa82d2b324 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -60,6 +60,13 @@ private[spark] class Executor(
System.setProperty(key, value)
}
+ // If we are in yarn mode, systems can have different disk layouts so we must set it
+ // to what Yarn on this system said was available. This will be used later when SparkEnv
+ // created.
+ if (java.lang.Boolean.valueOf(System.getenv("SPARK_YARN_MODE"))) {
+ System.setProperty("spark.local.dir", getYarnLocalDirs())
+ }
+
// Create our ClassLoader and set it on this thread
private val urlClassLoader = createClassLoader()
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
@@ -107,6 +114,21 @@ private[spark] class Executor(
threadPool.execute(new TaskRunner(context, taskId, serializedTask))
}
+ /** Get the Yarn approved local directories. */
+ private def getYarnLocalDirs(): String = {
+ // Hadoop 0.23 and 2.x have different Environment variable names for the
+ // local dirs, so lets check both. We assume one of the 2 is set.
+ // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
+ val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
+ .getOrElse(Option(System.getenv("LOCAL_DIRS"))
+ .getOrElse(""))
+
+ if (localDirs.isEmpty()) {
+ throw new Exception("Yarn Local dirs can't be empty")
+ }
+ return localDirs
+ }
+
class TaskRunner(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer)
extends Runnable {
diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala
index 6c41b97780..e512423fd6 100644
--- a/core/src/main/scala/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala
@@ -34,7 +34,7 @@ import org.apache.hadoop.util.ReflectionUtils
import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, SparkEnv, TaskContext}
import spark.util.NextIterator
-import org.apache.hadoop.conf.Configurable
+import org.apache.hadoop.conf.{Configuration, Configurable}
/**
@@ -132,4 +132,6 @@ class HadoopRDD[K, V](
override def checkpoint() {
// Do nothing. Hadoop RDD should not be checkpointed.
}
+
+ def getConf: Configuration = confBroadcast.value.value
}
diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
index 184685528e..b1877dc06e 100644
--- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
@@ -120,4 +120,7 @@ class NewHadoopRDD[K, V](
val theSplit = split.asInstanceOf[NewHadoopPartition]
theSplit.serializableHadoopSplit.value.getLocations.filter(_ != "localhost")
}
+
+ def getConf: Configuration = confBroadcast.value.value
}
+
diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
index 6ebbb5ec9b..eef3ee1425 100644
--- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
@@ -125,14 +125,14 @@ private[spark] class CoarseMesosSchedulerBackend(
StandaloneSchedulerBackend.ACTOR_NAME)
val uri = System.getProperty("spark.executor.uri")
if (uri == null) {
- val runScript = new File(sparkHome, "run").getCanonicalPath
+ val runScript = new File(sparkHome, "spark-class").getCanonicalPath
command.setValue("\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.split('/').last.split('.').head
- command.setValue("cd %s*; ./run spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
+ command.setValue("cd %s*; ./spark-class spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
index 2a2e178550..b7a981d101 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
@@ -372,12 +372,12 @@ object BlockManagerMasterActor {
if (blockStatus.storageLevel.useMemory) {
_remainingMem += blockStatus.memSize
logInfo("Removed %s on %s in memory (size: %s, free: %s)".format(
- blockId, blockManagerId.hostPort, Utils.bytesToString(memSize),
+ blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.memSize),
Utils.bytesToString(_remainingMem)))
}
if (blockStatus.storageLevel.useDisk) {
logInfo("Removed %s on %s on disk (size: %s)".format(
- blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
+ blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize)))
}
}
}
diff --git a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
index f96419520f..5ff0572f0a 100644
--- a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
@@ -35,7 +35,7 @@ private[spark] object UIWorkloadGenerator {
def main(args: Array[String]) {
if (args.length < 2) {
- println("usage: ./run spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR]")
+ println("usage: ./spark-class spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR]")
System.exit(1)
}
val master = args(0)
diff --git a/core/src/test/scala/spark/DriverSuite.scala b/core/src/test/scala/spark/DriverSuite.scala
index ed16b9d8ef..553c0309f6 100644
--- a/core/src/test/scala/spark/DriverSuite.scala
+++ b/core/src/test/scala/spark/DriverSuite.scala
@@ -34,7 +34,7 @@ class DriverSuite extends FunSuite with Timeouts {
val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]"))
forAll(masters) { (master: String) =>
failAfter(30 seconds) {
- Utils.execute(Seq("./run", "spark.DriverWithoutCleanup", master),
+ Utils.execute(Seq("./spark-class", "spark.DriverWithoutCleanup", master),
new File(System.getenv("SPARK_HOME")))
}
}
diff --git a/docs/bagel-programming-guide.md b/docs/bagel-programming-guide.md
index 8a0fa42d94..c526da3ca0 100644
--- a/docs/bagel-programming-guide.md
+++ b/docs/bagel-programming-guide.md
@@ -158,4 +158,4 @@ trait Message[K] {
## Where to Go from Here
-Two example jobs, PageRank and shortest path, are included in `bagel/src/main/scala/spark/bagel/examples`. You can run them by passing the class name to the `run` script included in Spark -- for example, `./run spark.bagel.examples.WikipediaPageRank`. Each example program prints usage help when run without any arguments.
+Two example jobs, PageRank and shortest path, are included in `examples/src/main/scala/spark/examples/bagel`. You can run them by passing the class name to the `run-example` script included in Spark -- for example, `./run-example spark.examples.bagel.WikipediaPageRank`. Each example program prints usage help when run without any arguments.
diff --git a/docs/building-with-maven.md b/docs/building-with-maven.md
index a9f2cb8a7a..7ecb601ddd 100644
--- a/docs/building-with-maven.md
+++ b/docs/building-with-maven.md
@@ -8,53 +8,51 @@ title: Building Spark with Maven
Building Spark using Maven Requires Maven 3 (the build process is tested with Maven 3.0.4) and Java 1.6 or newer.
-## Specifying the Hadoop version ##
-To enable support for HDFS and other Hadoop-supported storage systems, specify the exact Hadoop version by setting the "hadoop.version" property. If unset, Spark will build against Hadoop 1.0.4 by default.
+## Setting up Maven's Memory Usage ##
-For Apache Hadoop versions 1.x, Cloudera CDH MRv1, and other Hadoop versions without YARN, use:
+You'll need to configure Maven to use more memory than usual by setting `MAVEN_OPTS`. We recommend the following settings:
- # Apache Hadoop 1.2.1
- $ mvn -Dhadoop.version=1.2.1 clean install
+ export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
- # Cloudera CDH 4.2.0 with MapReduce v1
- $ mvn -Dhadoop.version=2.0.0-mr1-cdh4.2.0 clean install
+If you don't run this, you may see errors like the following:
-For Apache Hadoop 2.x, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions with YARN, enable the "hadoop2-yarn" profile:
-
- # Apache Hadoop 2.0.5-alpha
- $ mvn -Phadoop2-yarn -Dhadoop.version=2.0.5-alpha clean install
+ [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-{{site.SCALA_VERSION}}/classes...
+ [ERROR] PermGen space -> [Help 1]
- # Cloudera CDH 4.2.0 with MapReduce v2
- $ mvn -Phadoop2-yarn -Dhadoop.version=2.0.0-cdh4.2.0 clean install
+ [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-{{site.SCALA_VERSION}}/classes...
+ [ERROR] Java heap space -> [Help 1]
+You can fix this by setting the `MAVEN_OPTS` variable as discussed before.
-## Spark Tests in Maven ##
+## Specifying the Hadoop version ##
-Tests are run by default via the scalatest-maven-plugin. With this you can do things like:
+Because HDFS is not protocol-compatible across versions, if you want to read from HDFS, you'll need to build Spark against the specific HDFS version in your environment. You can do this through the "hadoop.version" property. If unset, Spark will build against Hadoop 1.0.4 by default.
-Skip test execution (but not compilation):
+For Apache Hadoop versions 1.x, Cloudera CDH MRv1, and other Hadoop versions without YARN, use:
- $ mvn -Dhadoop.version=... -DskipTests clean install
+ # Apache Hadoop 1.2.1
+ $ mvn -Dhadoop.version=1.2.1 -DskipTests clean package
-To run a specific test suite:
+ # Cloudera CDH 4.2.0 with MapReduce v1
+ $ mvn -Dhadoop.version=2.0.0-mr1-cdh4.2.0 -DskipTests clean package
- $ mvn -Dhadoop.version=... -Dsuites=spark.repl.ReplSuite test
+For Apache Hadoop 2.x, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions with YARN, you should also enable the "hadoop2-yarn" profile:
+ # Apache Hadoop 2.0.5-alpha
+ $ mvn -Phadoop2-yarn -Dhadoop.version=2.0.5-alpha -DskipTests clean package
-## Setting up JVM Memory Usage Via Maven ##
+ # Cloudera CDH 4.2.0 with MapReduce v2
+ $ mvn -Phadoop2-yarn -Dhadoop.version=2.0.0-cdh4.2.0 -DskipTests clean package
-You might run into the following errors if you're using a vanilla installation of Maven:
- [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-{{site.SCALA_VERSION}}/classes...
- [ERROR] PermGen space -> [Help 1]
+## Spark Tests in Maven ##
- [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-{{site.SCALA_VERSION}}/classes...
- [ERROR] Java heap space -> [Help 1]
+Tests are run by default via the [ScalaTest Maven plugin](http://www.scalatest.org/user_guide/using_the_scalatest_maven_plugin). Some of the require Spark to be packaged first, so always run `mvn package` with `-DskipTests` the first time. You can then run the tests with `mvn -Dhadoop.version=... test`.
-To fix these, you can do the following:
+The ScalaTest plugin also supports running only a specific test suite as follows:
- export MAVEN_OPTS="-Xmx1024m -XX:MaxPermSize=128M"
+ $ mvn -Dhadoop.version=... -Dsuites=spark.repl.ReplSuite test
## Continuous Compilation ##
@@ -63,8 +61,7 @@ We use the scala-maven-plugin which supports incremental and continuous compilat
$ mvn scala:cc
-…should run continuous compilation (i.e. wait for changes). However, this has not been tested extensively.
-
+should run continuous compilation (i.e. wait for changes). However, this has not been tested extensively.
## Using With IntelliJ IDEA ##
@@ -72,8 +69,8 @@ This setup works fine in IntelliJ IDEA 11.1.4. After opening the project via the
## Building Spark Debian Packages ##
-It includes support for building a Debian package containing a 'fat-jar' which includes the repl, the examples and bagel. This can be created by specifying the deb profile:
+It includes support for building a Debian package containing a 'fat-jar' which includes the repl, the examples and bagel. This can be created by specifying the following profiles:
- $ mvn -Pdeb clean install
+ $ mvn -Prepl-bin -Pdeb clean package
The debian package can then be found under repl/target. We added the short commit hash to the file name so that we can distinguish individual packages build for SNAPSHOT versions.
diff --git a/docs/index.md b/docs/index.md
index 0c4add45dc..ec9c7dd4f3 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -20,16 +20,16 @@ of these methods on slave nodes on your cluster.
Spark uses [Simple Build Tool](http://www.scala-sbt.org), which is bundled with it. To compile the code, go into the top-level Spark directory and run
- sbt/sbt package
+ sbt/sbt assembly
Spark also supports building using Maven. If you would like to build using Maven, see the [instructions for building Spark with Maven](building-with-maven.html).
# Testing the Build
Spark comes with a number of sample programs in the `examples` directory.
-To run one of the samples, use `./run <class> <params>` in the top-level Spark directory
+To run one of the samples, use `./run-example <class> <params>` in the top-level Spark directory
(the `run` script sets up the appropriate paths and launches that program).
-For example, `./run spark.examples.SparkPi` will run a sample program that estimates Pi. Each of the
+For example, `./run-example spark.examples.SparkPi` will run a sample program that estimates Pi. Each of the
examples prints usage help if no params are given.
Note that all of the sample programs take a `<master>` parameter specifying the cluster URL
diff --git a/docs/java-programming-guide.md b/docs/java-programming-guide.md
index ae8257b539..dd19a5f0c9 100644
--- a/docs/java-programming-guide.md
+++ b/docs/java-programming-guide.md
@@ -190,6 +190,6 @@ We hope to generate documentation with Java-style syntax in the future.
Spark includes several sample programs using the Java API in
[`examples/src/main/java`](https://github.com/mesos/spark/tree/master/examples/src/main/java/spark/examples). You can run them by passing the class name to the
-`run` script included in Spark -- for example, `./run
+`run-example` script included in Spark -- for example, `./run-example
spark.examples.JavaWordCount`. Each example program prints usage help when run
without any arguments.
diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md
index 794bff5647..15d3ebfcae 100644
--- a/docs/python-programming-guide.md
+++ b/docs/python-programming-guide.md
@@ -70,7 +70,7 @@ The script automatically adds the `pyspark` package to the `PYTHONPATH`.
The `pyspark` script launches a Python interpreter that is configured to run PySpark jobs. To use `pyspark` interactively, first build Spark, then launch it directly from the command line without any options:
{% highlight bash %}
-$ sbt/sbt package
+$ sbt/sbt assembly
$ ./pyspark
{% endhighlight %}
diff --git a/docs/quick-start.md b/docs/quick-start.md
index 335643536a..4e9deadbaa 100644
--- a/docs/quick-start.md
+++ b/docs/quick-start.md
@@ -12,7 +12,7 @@ See the [programming guide](scala-programming-guide.html) for a more complete re
To follow along with this guide, you only need to have successfully built Spark on one machine. Simply go into your Spark directory and run:
{% highlight bash %}
-$ sbt/sbt package
+$ sbt/sbt assembly
{% endhighlight %}
# Interactive Analysis with the Spark Shell
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 6bada9bdd7..678cd57aba 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -15,9 +15,9 @@ We need a consolidated spark core jar (which bundles all the required dependenci
This can be built either through sbt or via maven.
- Building spark assembled jar via sbt.
-Enable YARN support by setting `SPARK_WITH_YARN=true` when invoking sbt:
+Enable YARN support by setting `SPARK_YARN=true` when invoking sbt:
- SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_WITH_YARN=true ./sbt/sbt clean assembly
+ SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true ./sbt/sbt clean assembly
The assembled jar would typically be something like :
`./yarn/target/spark-yarn-assembly-0.8.0-SNAPSHOT.jar`
@@ -42,6 +42,12 @@ This will build the shaded (consolidated) jar. Typically something like :
If you want to test out the YARN deployment mode, you can use the current Spark examples. A `spark-examples_{{site.SCALA_VERSION}}-{{site.SPARK_VERSION}}` file can be generated by running `sbt/sbt package`. NOTE: since the documentation you're reading is for Spark version {{site.SPARK_VERSION}}, we are assuming here that you have downloaded Spark {{site.SPARK_VERSION}} or checked it out of source control. If you are using a different version of Spark, the version numbers in the jar generated by the sbt package command will obviously be different.
+# Configuration
+
+Most of the configs are the same for Spark on YARN as other deploys. See the Configuration page for more information on those. These are configs that are specific to SPARK on YARN.
+
+* `SPARK_YARN_USER_ENV`, to add environment variables to the Spark processes launched on YARN. This can be a comma separated list of environment variables. ie SPARK_YARN_USER_ENV="JAVA_HOME=/jdk64,FOO=bar"
+
# Launching Spark on YARN
Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which contains the (client side) configuration files for the hadoop cluster.
@@ -49,7 +55,7 @@ This would be used to connect to the cluster, write to the dfs and submit jobs t
The command to launch the YARN Client is as follows:
- SPARK_JAR=<SPARK_YARN_JAR_FILE> ./run spark.deploy.yarn.Client \
+ SPARK_JAR=<SPARK_YARN_JAR_FILE> ./spark-class spark.deploy.yarn.Client \
--jar <YOUR_APP_JAR_FILE> \
--class <APP_MAIN_CLASS> \
--args <APP_MAIN_ARGUMENTS> \
@@ -57,12 +63,11 @@ The command to launch the YARN Client is as follows:
--master-memory <MEMORY_FOR_MASTER> \
--worker-memory <MEMORY_PER_WORKER> \
--worker-cores <CORES_PER_WORKER> \
- --user <hadoop_user> \
--queue <queue_name>
For example:
- SPARK_JAR=./yarn/target/spark-yarn-assembly-{{site.SPARK_VERSION}}.jar ./run spark.deploy.yarn.Client \
+ SPARK_JAR=./yarn/target/spark-yarn-assembly-{{site.SPARK_VERSION}}.jar ./spark-class spark.deploy.yarn.Client \
--jar examples/target/scala-{{site.SCALA_VERSION}}/spark-examples_{{site.SCALA_VERSION}}-{{site.SPARK_VERSION}}.jar \
--class spark.examples.SparkPi \
--args yarn-standalone \
@@ -77,5 +82,4 @@ The above starts a YARN Client programs which periodically polls the Application
- When your application instantiates a Spark context it must use a special "yarn-standalone" master url. This starts the scheduler without forcing it to connect to a cluster. A good way to handle this is to pass "yarn-standalone" as an argument to your program, as shown in the example above.
- We do not requesting container resources based on the number of cores. Thus the numbers of cores given via command line arguments cannot be guaranteed.
-- Currently, we have not yet integrated with hadoop security. If --user is present, the hadoop_user specified will be used to run the tasks on the cluster. If unspecified, current user will be used (which should be valid in cluster).
- Once hadoop security support is added, and if hadoop cluster is enabled with security, additional restrictions would apply via delegation tokens passed.
+- The local directories used for spark will be the local directories configured for YARN (Hadoop Yarn config yarn.nodemanager.local-dirs). If the user specifies spark.local.dir, it will be ignored.
diff --git a/docs/scala-programming-guide.md b/docs/scala-programming-guide.md
index e9cf9ef36f..db584d2096 100644
--- a/docs/scala-programming-guide.md
+++ b/docs/scala-programming-guide.md
@@ -356,7 +356,7 @@ res2: Int = 10
# Where to Go from Here
You can see some [example Spark programs](http://www.spark-project.org/examples.html) on the Spark website.
-In addition, Spark includes several sample programs in `examples/src/main/scala`. Some of them have both Spark versions and local (non-parallel) versions, allowing you to see what had to be changed to make the program run on a cluster. You can run them using by passing the class name to the `run` script included in Spark -- for example, `./run spark.examples.SparkPi`. Each example program prints usage help when run without any arguments.
+In addition, Spark includes several sample programs in `examples/src/main/scala`. Some of them have both Spark versions and local (non-parallel) versions, allowing you to see what had to be changed to make the program run on a cluster. You can run them using by passing the class name to the `run-example` script included in Spark -- for example, `./run-example spark.examples.SparkPi`. Each example program prints usage help when run without any arguments.
For help on optimizing your program, the [configuration](configuration.html) and
[tuning](tuning.html) guides provide information on best practices. They are especially important for
diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md
index 7463844a4e..bb8be276c5 100644
--- a/docs/spark-standalone.md
+++ b/docs/spark-standalone.md
@@ -20,7 +20,7 @@ Compile Spark with `sbt package` as described in the [Getting Started Guide](ind
You can start a standalone master server by executing:
- ./run spark.deploy.master.Master
+ ./spark-class spark.deploy.master.Master
Once started, the master will print out a `spark://IP:PORT` URL for itself, which you can use to connect workers to it,
or pass as the "master" argument to `SparkContext` to connect a job to the cluster. You can also find this URL on
@@ -28,7 +28,7 @@ the master's web UI, which is [http://localhost:8080](http://localhost:8080) by
Similarly, you can start one or more workers and connect them to the master via:
- ./run spark.deploy.worker.Worker spark://IP:PORT
+ ./spark-class spark.deploy.worker.Worker spark://IP:PORT
Once you have started a worker, look at the master's web UI ([http://localhost:8080](http://localhost:8080) by default).
You should see the new node listed there, along with its number of CPUs and memory (minus one gigabyte left for the OS).
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index a74c17bdb7..3330e63598 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -234,7 +234,7 @@ $ nc -lk 9999
Then, in a different terminal, you can start NetworkWordCount by using
{% highlight bash %}
-$ ./run spark.streaming.examples.NetworkWordCount local[2] localhost 9999
+$ ./run-example spark.streaming.examples.NetworkWordCount local[2] localhost 9999
{% endhighlight %}
This will make NetworkWordCount connect to the netcat server. Any lines typed in the terminal running the netcat server will be counted and printed on screen.
@@ -272,7 +272,7 @@ Time: 1357008430000 ms
</td>
</table>
-You can find more examples in `<Spark repo>/streaming/src/main/scala/spark/streaming/examples/`. They can be run in the similar manner using `./run spark.streaming.examples....` . Executing without any parameter would give the required parameter list. Further explanation to run them can be found in comments in the files.
+You can find more examples in `<Spark repo>/streaming/src/main/scala/spark/streaming/examples/`. They can be run in the similar manner using `./run-example spark.streaming.examples....` . Executing without any parameter would give the required parameter list. Further explanation to run them can be found in comments in the files.
# DStream Persistence
Similar to RDDs, DStreams also allow developers to persist the stream's data in memory. That is, using `persist()` method on a DStream would automatically persist every RDD of that DStream in memory. This is useful if the data in the DStream will be computed multiple times (e.g., multiple operations on the same data). For window-based operations like `reduceByWindow` and `reduceByKeyAndWindow` and state-based operations like `updateStateByKey`, this is implicitly true. Hence, DStreams generated by window-based operations are automatically persisted in memory, without the developer calling `persist()`.
diff --git a/examples/pom.xml b/examples/pom.xml
index 0db52b8691..687fbcca8f 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -36,16 +36,25 @@
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-streaming</artifactId>
<version>${project.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-mllib</artifactId>
<version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-bagel</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
@@ -63,10 +72,6 @@
</exclusions>
</dependency>
<dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- </dependency>
- <dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</dependency>
@@ -121,13 +126,63 @@
</exclusions>
</dependency>
</dependencies>
+
+ <profiles>
+ <profile>
+ <id>hadoop2-yarn</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-yarn</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+
<build>
<outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
- <groupId>org.scalatest</groupId>
- <artifactId>scalatest-maven-plugin</artifactId>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <configuration>
+ <shadedArtifactAttached>false</shadedArtifactAttached>
+ <outputFile>${project.build.directory}/scala-${scala.version}/${project.artifactId}-assembly-${project.version}.jar</outputFile>
+ <artifactSet>
+ <includes>
+ <include>*:*</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+ <resource>reference.conf</resource>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
</plugin>
</plugins>
</build>
diff --git a/bagel/src/main/scala/spark/bagel/examples/PageRankUtils.scala b/examples/src/main/scala/spark/examples/bagel/PageRankUtils.scala
index de65e27fe0..c23ee9895f 100644
--- a/bagel/src/main/scala/spark/bagel/examples/PageRankUtils.scala
+++ b/examples/src/main/scala/spark/examples/bagel/PageRankUtils.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package spark.bagel.examples
+package spark.examples.bagel
import spark._
import spark.SparkContext._
diff --git a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala b/examples/src/main/scala/spark/examples/bagel/WikipediaPageRank.scala
index a0c5ac9c18..00635a7ffa 100644
--- a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala
+++ b/examples/src/main/scala/spark/examples/bagel/WikipediaPageRank.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package spark.bagel.examples
+package spark.examples.bagel
import spark._
import spark.SparkContext._
diff --git a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala b/examples/src/main/scala/spark/examples/bagel/WikipediaPageRankStandalone.scala
index 3c54a85f42..c416ddbc58 100644
--- a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala
+++ b/examples/src/main/scala/spark/examples/bagel/WikipediaPageRankStandalone.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package spark.bagel.examples
+package spark.examples.bagel
import spark._
import serializer.{DeserializationStream, SerializationStream, SerializerInstance}
diff --git a/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala
index f97174aeae..05d3176478 100644
--- a/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala
@@ -132,9 +132,9 @@ object FeederActor {
* <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on.
*
* To run this example locally, you may run Feeder Actor as
- * `$ ./run spark.streaming.examples.FeederActor 127.0.1.1 9999`
+ * `$ ./run-example spark.streaming.examples.FeederActor 127.0.1.1 9999`
* and then run the example
- * `$ ./run spark.streaming.examples.ActorWordCount local[2] 127.0.1.1 9999`
+ * `$ ./run-example spark.streaming.examples.ActorWordCount local[2] 127.0.1.1 9999`
*/
object ActorWordCount {
def main(args: Array[String]) {
diff --git a/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala b/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala
index f5baec242d..30af01a26f 100644
--- a/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala
@@ -28,7 +28,7 @@ import spark.streaming.StreamingContext._
* <directory> is the directory that Spark Streaming will use to find and read new text files.
*
* To run this on your local machine on directory `localdir`, run this example
- * `$ ./run spark.streaming.examples.HdfsWordCount local[2] localdir`
+ * `$ ./run-example spark.streaming.examples.HdfsWordCount local[2] localdir`
* Then create a text file in `localdir` and the words in the file will get counted.
*/
object HdfsWordCount {
diff --git a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
index 4929703ba2..d9c76d1a33 100644
--- a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
@@ -37,7 +37,7 @@ import spark.streaming.util.RawTextHelper._
* <numThreads> is the number of threads the kafka consumer should use
*
* Example:
- * `./run spark.streaming.examples.KafkaWordCount local[2] zoo01,zoo02,zoo03 my-consumer-group topic1,topic2 1`
+ * `./run-example spark.streaming.examples.KafkaWordCount local[2] zoo01,zoo02,zoo03 my-consumer-group topic1,topic2 1`
*/
object KafkaWordCount {
def main(args: Array[String]) {
diff --git a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala
index 150fb5eb9c..b29d79aac5 100644
--- a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala
@@ -29,7 +29,7 @@ import spark.streaming.StreamingContext._
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
- * `$ ./run spark.streaming.examples.NetworkWordCount local[2] localhost 9999`
+ * `$ ./run-example spark.streaming.examples.NetworkWordCount local[2] localhost 9999`
*/
object NetworkWordCount {
def main(args: Array[String]) {
diff --git a/examples/src/main/scala/spark/streaming/examples/StatefulNetworkWordCount.scala b/examples/src/main/scala/spark/streaming/examples/StatefulNetworkWordCount.scala
index 33ab324732..b709fc3c87 100644
--- a/examples/src/main/scala/spark/streaming/examples/StatefulNetworkWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/StatefulNetworkWordCount.scala
@@ -29,7 +29,7 @@ import spark.streaming.StreamingContext._
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
- * `$ ./run spark.streaming.examples.StatefulNetworkWordCount local[2] localhost 9999`
+ * `$ ./run-example spark.streaming.examples.StatefulNetworkWordCount local[2] localhost 9999`
*/
object StatefulNetworkWordCount {
def main(args: Array[String]) {
diff --git a/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala
index e264fae609..a0cae06c30 100644
--- a/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala
@@ -60,9 +60,9 @@ object SimpleZeroMQPublisher {
* <zeroMQurl> and <topic> describe where zeroMq publisher is running.
*
* To run this example locally, you may run publisher as
- * `$ ./run spark.streaming.examples.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar`
+ * `$ ./run-example spark.streaming.examples.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar`
* and run the example as
- * `$ ./run spark.streaming.examples.ZeroMQWordCount local[2] tcp://127.0.1.1:1234 foo`
+ * `$ ./run-example spark.streaming.examples.ZeroMQWordCount local[2] tcp://127.0.1.1:1234 foo`
*/
object ZeroMQWordCount {
def main(args: Array[String]) {
diff --git a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala
index 375d5c9d22..dd36bbbf32 100644
--- a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala
+++ b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala
@@ -37,8 +37,8 @@ object PageView {
/** Generates streaming events to simulate page views on a website.
*
* This should be used in tandem with PageViewStream.scala. Example:
- * $ ./run spark.streaming.examples.clickstream.PageViewGenerator 44444 10
- * $ ./run spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444
+ * $ ./run-example spark.streaming.examples.clickstream.PageViewGenerator 44444 10
+ * $ ./run-example spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444
* */
object PageViewGenerator {
val pages = Map("http://foo.com/" -> .7,
diff --git a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
index a24342bebf..152da23489 100644
--- a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
+++ b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
@@ -25,9 +25,9 @@ import spark.SparkContext._
* operators available in Spark streaming.
*
* This should be used in tandem with PageViewStream.scala. Example:
- * $ ./run spark.streaming.examples.clickstream.PageViewGenerator 44444 10
- * $ ./run spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444
- * */
+ * $ ./run-example spark.streaming.examples.clickstream.PageViewGenerator 44444 10
+ * $ ./run-example spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444
+ */
object PageViewStream {
def main(args: Array[String]) {
if (args.length != 3) {
diff --git a/make-distribution.sh b/make-distribution.sh
index 70aff418c7..91f6278491 100755
--- a/make-distribution.sh
+++ b/make-distribution.sh
@@ -47,7 +47,7 @@ VERSION=$($FWDIR/sbt/sbt "show version" | tail -1 | cut -f 2 | sed 's/^\([a-zA-Z
# Initialize defaults
SPARK_HADOOP_VERSION=1.0.4
-SPARK_WITH_YARN=false
+SPARK_YARN=false
MAKE_TGZ=false
# Parse arguments
@@ -58,7 +58,7 @@ while (( "$#" )); do
shift
;;
--with-yarn)
- SPARK_WITH_YARN=true
+ SPARK_YARN=true
;;
--tgz)
MAKE_TGZ=true
@@ -74,7 +74,7 @@ else
fi
echo "Hadoop version set to $SPARK_HADOOP_VERSION"
-if [ "$SPARK_WITH_YARN" == "true" ]; then
+if [ "$SPARK_YARN" == "true" ]; then
echo "YARN enabled"
else
echo "YARN disabled"
@@ -82,22 +82,26 @@ fi
# Build fat JAR
export SPARK_HADOOP_VERSION
-export SPARK_WITH_YARN
-"$FWDIR/sbt/sbt" "repl/assembly"
+export SPARK_YARN
+"$FWDIR/sbt/sbt" "assembly/assembly"
# Make directories
rm -rf "$DISTDIR"
mkdir -p "$DISTDIR/jars"
-echo "$VERSION" > "$DISTDIR/RELEASE"
+echo "Spark $VERSION built for Hadoop $SPARK_HADOOP_VERSION" > "$DISTDIR/RELEASE"
# Copy jars
-cp $FWDIR/repl/target/*.jar "$DISTDIR/jars/"
+cp $FWDIR/assembly/target/scala*/*assembly*hadoop*.jar "$DISTDIR/jars/"
# Copy other things
+mkdir "$DISTDIR"/conf
+cp -r "$FWDIR/conf/*.template" "$DISTDIR"
cp -r "$FWDIR/bin" "$DISTDIR"
-cp -r "$FWDIR/conf" "$DISTDIR"
-cp "$FWDIR/run" "$FWDIR/spark-shell" "$DISTDIR"
+cp -r "$FWDIR/python" "$DISTDIR"
+cp "$FWDIR/spark-class" "$DISTDIR"
+cp "$FWDIR/spark-shell" "$DISTDIR"
cp "$FWDIR/spark-executor" "$DISTDIR"
+cp "$FWDIR/pyspark" "$DISTDIR"
if [ "$MAKE_TGZ" == "true" ]; then
diff --git a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala
index 30ee0ab0ff..482e4a6745 100644
--- a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala
@@ -17,12 +17,13 @@
package spark.mllib.classification
+import scala.math.round
+
import spark.{Logging, RDD, SparkContext}
import spark.mllib.optimization._
import spark.mllib.regression._
import spark.mllib.util.MLUtils
-
-import scala.math.round
+import spark.mllib.util.DataValidators
import org.jblas.DoubleMatrix
@@ -47,26 +48,29 @@ class LogisticRegressionModel(
/**
* Train a classification model for Logistic Regression using Stochastic Gradient Descent.
+ * NOTE: Labels used in Logistic Regression should be {0, 1}
*/
class LogisticRegressionWithSGD private (
var stepSize: Double,
var numIterations: Int,
var regParam: Double,
- var miniBatchFraction: Double,
- var addIntercept: Boolean)
+ var miniBatchFraction: Double)
extends GeneralizedLinearAlgorithm[LogisticRegressionModel]
with Serializable {
val gradient = new LogisticGradient()
val updater = new SimpleUpdater()
- val optimizer = new GradientDescent(gradient, updater).setStepSize(stepSize)
- .setNumIterations(numIterations)
- .setRegParam(regParam)
- .setMiniBatchFraction(miniBatchFraction)
+ override val optimizer = new GradientDescent(gradient, updater)
+ .setStepSize(stepSize)
+ .setNumIterations(numIterations)
+ .setRegParam(regParam)
+ .setMiniBatchFraction(miniBatchFraction)
+ override val validators = List(DataValidators.classificationLabels)
+
/**
* Construct a LogisticRegression object with default parameters
*/
- def this() = this(1.0, 100, 0.0, 1.0, true)
+ def this() = this(1.0, 100, 0.0, 1.0)
def createModel(weights: Array[Double], intercept: Double) = {
new LogisticRegressionModel(weights, intercept)
@@ -75,6 +79,7 @@ class LogisticRegressionWithSGD private (
/**
* Top-level methods for calling Logistic Regression.
+ * NOTE: Labels used in Logistic Regression should be {0, 1}
*/
object LogisticRegressionWithSGD {
// NOTE(shivaram): We use multiple train methods instead of default arguments to support
@@ -85,6 +90,7 @@ object LogisticRegressionWithSGD {
* number of iterations of gradient descent using the specified step size. Each iteration uses
* `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in
* gradient descent are initialized using the initial weights provided.
+ * NOTE: Labels used in Logistic Regression should be {0, 1}
*
* @param input RDD of (label, array of features) pairs.
* @param numIterations Number of iterations of gradient descent to run.
@@ -101,7 +107,7 @@ object LogisticRegressionWithSGD {
initialWeights: Array[Double])
: LogisticRegressionModel =
{
- new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction, true).run(
+ new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction).run(
input, initialWeights)
}
@@ -109,6 +115,7 @@ object LogisticRegressionWithSGD {
* Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed
* number of iterations of gradient descent using the specified step size. Each iteration uses
* `miniBatchFraction` fraction of the data to calculate the gradient.
+ * NOTE: Labels used in Logistic Regression should be {0, 1}
*
* @param input RDD of (label, array of features) pairs.
* @param numIterations Number of iterations of gradient descent to run.
@@ -123,7 +130,7 @@ object LogisticRegressionWithSGD {
miniBatchFraction: Double)
: LogisticRegressionModel =
{
- new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction, true).run(
+ new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction).run(
input)
}
@@ -131,6 +138,7 @@ object LogisticRegressionWithSGD {
* Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed
* number of iterations of gradient descent using the specified step size. We use the entire data
* set to update the gradient in each iteration.
+ * NOTE: Labels used in Logistic Regression should be {0, 1}
*
* @param input RDD of (label, array of features) pairs.
* @param stepSize Step size to be used for each iteration of Gradient Descent.
@@ -151,6 +159,7 @@ object LogisticRegressionWithSGD {
* Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed
* number of iterations of gradient descent using a step size of 1.0. We use the entire data set
* to update the gradient in each iteration.
+ * NOTE: Labels used in Logistic Regression should be {0, 1}
*
* @param input RDD of (label, array of features) pairs.
* @param numIterations Number of iterations of gradient descent to run.
diff --git a/mllib/src/main/scala/spark/mllib/classification/SVM.scala b/mllib/src/main/scala/spark/mllib/classification/SVM.scala
index f799cb2829..69393cd7b0 100644
--- a/mllib/src/main/scala/spark/mllib/classification/SVM.scala
+++ b/mllib/src/main/scala/spark/mllib/classification/SVM.scala
@@ -18,10 +18,12 @@
package spark.mllib.classification
import scala.math.signum
+
import spark.{Logging, RDD, SparkContext}
import spark.mllib.optimization._
import spark.mllib.regression._
import spark.mllib.util.MLUtils
+import spark.mllib.util.DataValidators
import org.jblas.DoubleMatrix
@@ -39,31 +41,36 @@ class SVMModel(
override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
intercept: Double) = {
- signum(dataMatrix.dot(weightMatrix) + intercept)
+ val margin = dataMatrix.dot(weightMatrix) + intercept
+ if (margin < 0) 0.0 else 1.0
}
}
/**
* Train an SVM using Stochastic Gradient Descent.
+ * NOTE: Labels used in SVM should be {0, 1}
*/
class SVMWithSGD private (
var stepSize: Double,
var numIterations: Int,
var regParam: Double,
- var miniBatchFraction: Double,
- var addIntercept: Boolean)
+ var miniBatchFraction: Double)
extends GeneralizedLinearAlgorithm[SVMModel] with Serializable {
val gradient = new HingeGradient()
val updater = new SquaredL2Updater()
- val optimizer = new GradientDescent(gradient, updater).setStepSize(stepSize)
- .setNumIterations(numIterations)
- .setRegParam(regParam)
- .setMiniBatchFraction(miniBatchFraction)
+ override val optimizer = new GradientDescent(gradient, updater)
+ .setStepSize(stepSize)
+ .setNumIterations(numIterations)
+ .setRegParam(regParam)
+ .setMiniBatchFraction(miniBatchFraction)
+
+ override val validators = List(DataValidators.classificationLabels)
+
/**
* Construct a SVM object with default parameters
*/
- def this() = this(1.0, 100, 1.0, 1.0, true)
+ def this() = this(1.0, 100, 1.0, 1.0)
def createModel(weights: Array[Double], intercept: Double) = {
new SVMModel(weights, intercept)
@@ -71,7 +78,7 @@ class SVMWithSGD private (
}
/**
- * Top-level methods for calling SVM.
+ * Top-level methods for calling SVM. NOTE: Labels used in SVM should be {0, 1}
*/
object SVMWithSGD {
@@ -80,6 +87,7 @@ object SVMWithSGD {
* of iterations of gradient descent using the specified step size. Each iteration uses
* `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in
* gradient descent are initialized using the initial weights provided.
+ * NOTE: Labels used in SVM should be {0, 1}
*
* @param input RDD of (label, array of features) pairs.
* @param numIterations Number of iterations of gradient descent to run.
@@ -98,7 +106,7 @@ object SVMWithSGD {
initialWeights: Array[Double])
: SVMModel =
{
- new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).run(input,
+ new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input,
initialWeights)
}
@@ -106,6 +114,7 @@ object SVMWithSGD {
* Train a SVM model given an RDD of (label, features) pairs. We run a fixed number
* of iterations of gradient descent using the specified step size. Each iteration uses
* `miniBatchFraction` fraction of the data to calculate the gradient.
+ * NOTE: Labels used in SVM should be {0, 1}
*
* @param input RDD of (label, array of features) pairs.
* @param numIterations Number of iterations of gradient descent to run.
@@ -121,13 +130,14 @@ object SVMWithSGD {
miniBatchFraction: Double)
: SVMModel =
{
- new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).run(input)
+ new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input)
}
/**
* Train a SVM model given an RDD of (label, features) pairs. We run a fixed number
* of iterations of gradient descent using the specified step size. We use the entire data set to
* update the gradient in each iteration.
+ * NOTE: Labels used in SVM should be {0, 1}
*
* @param input RDD of (label, array of features) pairs.
* @param stepSize Step size to be used for each iteration of Gradient Descent.
@@ -149,6 +159,7 @@ object SVMWithSGD {
* Train a SVM model given an RDD of (label, features) pairs. We run a fixed number
* of iterations of gradient descent using a step size of 1.0. We use the entire data set to
* update the gradient in each iteration.
+ * NOTE: Labels used in SVM should be {0, 1}
*
* @param input RDD of (label, array of features) pairs.
* @param numIterations Number of iterations of gradient descent to run.
diff --git a/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala
index e72b8b3a92..05568f55af 100644
--- a/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala
+++ b/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala
@@ -77,16 +77,22 @@ class SquaredGradient extends Gradient {
/**
* Compute gradient and loss for a Hinge loss function.
+ * NOTE: This assumes that the labels are {0,1}
*/
class HingeGradient extends Gradient {
- override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix):
+ override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix):
(DoubleMatrix, Double) = {
val dotProduct = data.dot(weights)
- if (1.0 > label * dotProduct)
- (data.mul(-label), 1.0 - label * dotProduct)
- else
- (DoubleMatrix.zeros(1,weights.length), 0.0)
+ // Our loss function with {0, 1} labels is max(0, 1 - (2y – 1) (f_w(x)))
+ // Therefore the gradient is -(2y - 1)*x
+ val labelScaled = 2 * label - 1.0
+
+ if (1.0 > labelScaled * dotProduct) {
+ (data.mul(-labelScaled), 1.0 - labelScaled * dotProduct)
+ } else {
+ (DoubleMatrix.zeros(1, weights.length), 0.0)
+ }
}
}
diff --git a/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
index 4ecafff08b..d164d415d6 100644
--- a/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
+++ b/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
@@ -17,7 +17,7 @@
package spark.mllib.regression
-import spark.{Logging, RDD}
+import spark.{Logging, RDD, SparkException}
import spark.mllib.optimization._
import org.jblas.DoubleMatrix
@@ -83,15 +83,19 @@ abstract class GeneralizedLinearModel(val weights: Array[Double], val intercept:
abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
extends Logging with Serializable {
+ protected val validators: Seq[RDD[LabeledPoint] => Boolean] = List()
+
val optimizer: Optimizer
+ protected var addIntercept: Boolean = true
+
+ protected var validateData: Boolean = true
+
/**
* Create a model given the weights and intercept
*/
protected def createModel(weights: Array[Double], intercept: Double): M
- protected var addIntercept: Boolean
-
/**
* Set if the algorithm should add an intercept. Default true.
*/
@@ -101,6 +105,14 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
}
/**
+ * Set if the algorithm should validate data before training. Default true.
+ */
+ def setValidateData(validateData: Boolean): this.type = {
+ this.validateData = validateData
+ this
+ }
+
+ /**
* Run the algorithm with the configured parameters on an input
* RDD of LabeledPoint entries.
*/
@@ -116,6 +128,11 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
*/
def run(input: RDD[LabeledPoint], initialWeights: Array[Double]) : M = {
+ // Check the data properties before running the optimizer
+ if (validateData && !validators.forall(func => func(input))) {
+ throw new SparkException("Input validation failed.")
+ }
+
// Add a extra variable consisting of all 1.0's for the intercept.
val data = if (addIntercept) {
input.map(labeledPoint => (labeledPoint.label, Array(1.0, labeledPoint.features:_*)))
diff --git a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala
index 6bbc990a5a..0f33456ef4 100644
--- a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala
+++ b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala
@@ -48,25 +48,59 @@ class LassoWithSGD private (
var stepSize: Double,
var numIterations: Int,
var regParam: Double,
- var miniBatchFraction: Double,
- var addIntercept: Boolean)
+ var miniBatchFraction: Double)
extends GeneralizedLinearAlgorithm[LassoModel]
with Serializable {
val gradient = new SquaredGradient()
val updater = new L1Updater()
- val optimizer = new GradientDescent(gradient, updater).setStepSize(stepSize)
- .setNumIterations(numIterations)
- .setRegParam(regParam)
- .setMiniBatchFraction(miniBatchFraction)
+ @transient val optimizer = new GradientDescent(gradient, updater).setStepSize(stepSize)
+ .setNumIterations(numIterations)
+ .setRegParam(regParam)
+ .setMiniBatchFraction(miniBatchFraction)
+
+ // We don't want to penalize the intercept, so set this to false.
+ setIntercept(false)
+
+ var yMean = 0.0
+ var xColMean: DoubleMatrix = _
+ var xColSd: DoubleMatrix = _
/**
* Construct a Lasso object with default parameters
*/
- def this() = this(1.0, 100, 1.0, 1.0, true)
+ def this() = this(1.0, 100, 1.0, 1.0)
def createModel(weights: Array[Double], intercept: Double) = {
- new LassoModel(weights, intercept)
+ val weightsMat = new DoubleMatrix(weights.length + 1, 1, (Array(intercept) ++ weights):_*)
+ val weightsScaled = weightsMat.div(xColSd)
+ val interceptScaled = yMean - (weightsMat.transpose().mmul(xColMean.div(xColSd)).get(0))
+
+ new LassoModel(weightsScaled.data, interceptScaled)
+ }
+
+ override def run(
+ input: RDD[LabeledPoint],
+ initialWeights: Array[Double])
+ : LassoModel =
+ {
+ val nfeatures: Int = input.first.features.length
+ val nexamples: Long = input.count()
+
+ // To avoid penalizing the intercept, we center and scale the data.
+ val stats = MLUtils.computeStats(input, nfeatures, nexamples)
+ yMean = stats._1
+ xColMean = stats._2
+ xColSd = stats._3
+
+ val normalizedData = input.map { point =>
+ val yNormalized = point.label - yMean
+ val featuresMat = new DoubleMatrix(nfeatures, 1, point.features:_*)
+ val featuresNormalized = featuresMat.sub(xColMean).divi(xColSd)
+ LabeledPoint(yNormalized, featuresNormalized.toArray)
+ }
+
+ super.run(normalizedData, initialWeights)
}
}
@@ -98,7 +132,7 @@ object LassoWithSGD {
initialWeights: Array[Double])
: LassoModel =
{
- new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).run(input,
+ new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input,
initialWeights)
}
@@ -121,7 +155,7 @@ object LassoWithSGD {
miniBatchFraction: Double)
: LassoModel =
{
- new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).run(input)
+ new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input)
}
/**
diff --git a/mllib/src/main/scala/spark/mllib/regression/LinearRegression.scala b/mllib/src/main/scala/spark/mllib/regression/LinearRegression.scala
new file mode 100644
index 0000000000..885ff5a30d
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/regression/LinearRegression.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.regression
+
+import spark.{Logging, RDD, SparkContext}
+import spark.mllib.optimization._
+import spark.mllib.util.MLUtils
+
+import org.jblas.DoubleMatrix
+
+/**
+ * Regression model trained using LinearRegression.
+ *
+ * @param weights Weights computed for every feature.
+ * @param intercept Intercept computed for this model.
+ */
+class LinearRegressionModel(
+ override val weights: Array[Double],
+ override val intercept: Double)
+ extends GeneralizedLinearModel(weights, intercept)
+ with RegressionModel with Serializable {
+
+ override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
+ intercept: Double) = {
+ dataMatrix.dot(weightMatrix) + intercept
+ }
+}
+
+/**
+ * Train a regression model with no regularization using Stochastic Gradient Descent.
+ */
+class LinearRegressionWithSGD private (
+ var stepSize: Double,
+ var numIterations: Int,
+ var miniBatchFraction: Double)
+ extends GeneralizedLinearAlgorithm[LinearRegressionModel]
+ with Serializable {
+
+ val gradient = new SquaredGradient()
+ val updater = new SimpleUpdater()
+ val optimizer = new GradientDescent(gradient, updater).setStepSize(stepSize)
+ .setNumIterations(numIterations)
+ .setMiniBatchFraction(miniBatchFraction)
+
+ /**
+ * Construct a LinearRegression object with default parameters
+ */
+ def this() = this(1.0, 100, 1.0)
+
+ def createModel(weights: Array[Double], intercept: Double) = {
+ new LinearRegressionModel(weights, intercept)
+ }
+}
+
+/**
+ * Top-level methods for calling LinearRegression.
+ */
+object LinearRegressionWithSGD {
+
+ /**
+ * Train a Linear Regression model given an RDD of (label, features) pairs. We run a fixed number
+ * of iterations of gradient descent using the specified step size. Each iteration uses
+ * `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in
+ * gradient descent are initialized using the initial weights provided.
+ *
+ * @param input RDD of (label, array of features) pairs.
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @param stepSize Step size to be used for each iteration of gradient descent.
+ * @param miniBatchFraction Fraction of data to be used per iteration.
+ * @param initialWeights Initial set of weights to be used. Array should be equal in size to
+ * the number of features in the data.
+ */
+ def train(
+ input: RDD[LabeledPoint],
+ numIterations: Int,
+ stepSize: Double,
+ miniBatchFraction: Double,
+ initialWeights: Array[Double])
+ : LinearRegressionModel =
+ {
+ new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction).run(input,
+ initialWeights)
+ }
+
+ /**
+ * Train a LinearRegression model given an RDD of (label, features) pairs. We run a fixed number
+ * of iterations of gradient descent using the specified step size. Each iteration uses
+ * `miniBatchFraction` fraction of the data to calculate the gradient.
+ *
+ * @param input RDD of (label, array of features) pairs.
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @param stepSize Step size to be used for each iteration of gradient descent.
+ * @param miniBatchFraction Fraction of data to be used per iteration.
+ */
+ def train(
+ input: RDD[LabeledPoint],
+ numIterations: Int,
+ stepSize: Double,
+ miniBatchFraction: Double)
+ : LinearRegressionModel =
+ {
+ new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction).run(input)
+ }
+
+ /**
+ * Train a LinearRegression model given an RDD of (label, features) pairs. We run a fixed number
+ * of iterations of gradient descent using the specified step size. We use the entire data set to
+ * update the gradient in each iteration.
+ *
+ * @param input RDD of (label, array of features) pairs.
+ * @param stepSize Step size to be used for each iteration of Gradient Descent.
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @return a LinearRegressionModel which has the weights and offset from training.
+ */
+ def train(
+ input: RDD[LabeledPoint],
+ numIterations: Int,
+ stepSize: Double)
+ : LinearRegressionModel =
+ {
+ train(input, numIterations, stepSize, 1.0)
+ }
+
+ /**
+ * Train a LinearRegression model given an RDD of (label, features) pairs. We run a fixed number
+ * of iterations of gradient descent using a step size of 1.0. We use the entire data set to
+ * update the gradient in each iteration.
+ *
+ * @param input RDD of (label, array of features) pairs.
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @return a LinearRegressionModel which has the weights and offset from training.
+ */
+ def train(
+ input: RDD[LabeledPoint],
+ numIterations: Int)
+ : LinearRegressionModel =
+ {
+ train(input, numIterations, 1.0, 1.0)
+ }
+
+ def main(args: Array[String]) {
+ if (args.length != 5) {
+ println("Usage: LinearRegression <master> <input_dir> <step_size> <niters>")
+ System.exit(1)
+ }
+ val sc = new SparkContext(args(0), "LinearRegression")
+ val data = MLUtils.loadLabeledData(sc, args(1))
+ val model = LinearRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble)
+
+ sc.stop()
+ }
+}
diff --git a/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala b/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala
index b42d94af41..cb1303dd99 100644
--- a/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala
+++ b/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala
@@ -18,200 +18,196 @@
package spark.mllib.regression
import spark.{Logging, RDD, SparkContext}
+import spark.mllib.optimization._
import spark.mllib.util.MLUtils
import org.jblas.DoubleMatrix
-import org.jblas.Solve
-
-import scala.annotation.tailrec
-import scala.collection.mutable
/**
- * Ridge Regression from Joseph Gonzalez's implementation in MLBase
+ * Regression model trained using RidgeRegression.
+ *
+ * @param weights Weights computed for every feature.
+ * @param intercept Intercept computed for this model.
*/
class RidgeRegressionModel(
- val weights: DoubleMatrix,
- val intercept: Double,
- val lambdaOpt: Double,
- val lambdas: Seq[(Double, Double, DoubleMatrix)])
- extends RegressionModel {
-
- override def predict(testData: RDD[Array[Double]]): RDD[Double] = {
- // A small optimization to avoid serializing the entire model.
- val localIntercept = this.intercept
- val localWeights = this.weights
- testData.map { x =>
- (new DoubleMatrix(1, x.length, x:_*).mmul(localWeights)).get(0) + localIntercept
- }
- }
-
- override def predict(testData: Array[Double]): Double = {
- (new DoubleMatrix(1, testData.length, testData:_*).mmul(this.weights)).get(0) + this.intercept
+ override val weights: Array[Double],
+ override val intercept: Double)
+ extends GeneralizedLinearModel(weights, intercept)
+ with RegressionModel with Serializable {
+
+ override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
+ intercept: Double) = {
+ dataMatrix.dot(weightMatrix) + intercept
}
}
-class RidgeRegression private (var lambdaLow: Double, var lambdaHigh: Double)
- extends Logging {
+/**
+ * Train a regression model with L2-regularization using Stochastic Gradient Descent.
+ */
+class RidgeRegressionWithSGD private (
+ var stepSize: Double,
+ var numIterations: Int,
+ var regParam: Double,
+ var miniBatchFraction: Double)
+ extends GeneralizedLinearAlgorithm[RidgeRegressionModel]
+ with Serializable {
+
+ val gradient = new SquaredGradient()
+ val updater = new SquaredL2Updater()
- def this() = this(0.0, 100.0)
+ @transient val optimizer = new GradientDescent(gradient, updater).setStepSize(stepSize)
+ .setNumIterations(numIterations)
+ .setRegParam(regParam)
+ .setMiniBatchFraction(miniBatchFraction)
- /**
- * Set the lower bound on binary search for lambda's. Default is 0.
- */
- def setLowLambda(low: Double) = {
- this.lambdaLow = low
- this
- }
+ // We don't want to penalize the intercept in RidgeRegression, so set this to false.
+ setIntercept(false)
+
+ var yMean = 0.0
+ var xColMean: DoubleMatrix = _
+ var xColSd: DoubleMatrix = _
/**
- * Set the upper bound on binary search for lambda's. Default is 100.0.
+ * Construct a RidgeRegression object with default parameters
*/
- def setHighLambda(hi: Double) = {
- this.lambdaHigh = hi
- this
+ def this() = this(1.0, 100, 1.0, 1.0)
+
+ def createModel(weights: Array[Double], intercept: Double) = {
+ val weightsMat = new DoubleMatrix(weights.length + 1, 1, (Array(intercept) ++ weights):_*)
+ val weightsScaled = weightsMat.div(xColSd)
+ val interceptScaled = yMean - (weightsMat.transpose().mmul(xColMean.div(xColSd)).get(0))
+
+ new RidgeRegressionModel(weightsScaled.data, interceptScaled)
}
- def train(inputLabeled: RDD[LabeledPoint]): RidgeRegressionModel = {
- val input = inputLabeled.map(labeledPoint => (labeledPoint.label, labeledPoint.features))
- val nfeatures: Int = input.take(1)(0)._2.length
+ override def run(
+ input: RDD[LabeledPoint],
+ initialWeights: Array[Double])
+ : RidgeRegressionModel =
+ {
+ val nfeatures: Int = input.first.features.length
val nexamples: Long = input.count()
- val (yMean, xColMean, xColSd) = MLUtils.computeStats(input, nfeatures, nexamples)
+ // To avoid penalizing the intercept, we center and scale the data.
+ val stats = MLUtils.computeStats(input, nfeatures, nexamples)
+ yMean = stats._1
+ xColMean = stats._2
+ xColSd = stats._3
- val data = input.map { case(y, features) =>
- val yNormalized = y - yMean
- val featuresMat = new DoubleMatrix(nfeatures, 1, features:_*)
+ val normalizedData = input.map { point =>
+ val yNormalized = point.label - yMean
+ val featuresMat = new DoubleMatrix(nfeatures, 1, point.features:_*)
val featuresNormalized = featuresMat.sub(xColMean).divi(xColSd)
- (yNormalized, featuresNormalized.toArray)
- }
-
- // Compute XtX - Size of XtX is nfeatures by nfeatures
- val XtX: DoubleMatrix = data.map { case (y, features) =>
- val x = new DoubleMatrix(1, features.length, features:_*)
- x.transpose().mmul(x)
- }.reduce(_.addi(_))
-
- // Compute Xt*y - Size of Xty is nfeatures by 1
- val Xty: DoubleMatrix = data.map { case (y, features) =>
- new DoubleMatrix(features.length, 1, features:_*).mul(y)
- }.reduce(_.addi(_))
-
- // Define a function to compute the leave one out cross validation error
- // for a single example
- def crossValidate(lambda: Double): (Double, Double, DoubleMatrix) = {
- // Compute the MLE ridge regression parameter value
-
- // Ridge Regression parameter = inv(XtX + \lambda*I) * Xty
- val XtXlambda = DoubleMatrix.eye(nfeatures).muli(lambda).addi(XtX)
- val w = Solve.solveSymmetric(XtXlambda, Xty)
-
- val invXtX = Solve.solveSymmetric(XtXlambda, DoubleMatrix.eye(nfeatures))
-
- // compute the generalized cross validation score
- val cverror = data.map {
- case (y, features) =>
- val x = new DoubleMatrix(features.length, 1, features:_*)
- val yhat = w.transpose().mmul(x).get(0)
- val H_ii = x.transpose().mmul(invXtX).mmul(x).get(0)
- val residual = (y - yhat) / (1.0 - H_ii)
- residual * residual
- }.reduce(_ + _) / nexamples
-
- (lambda, cverror, w)
- }
-
- // Binary search for the best assignment to lambda.
- def binSearch(low: Double, high: Double): Seq[(Double, Double, DoubleMatrix)] = {
- val buffer = mutable.ListBuffer.empty[(Double, Double, DoubleMatrix)]
-
- @tailrec
- def loop(low: Double, high: Double): Seq[(Double, Double, DoubleMatrix)] = {
- val mid = (high - low) / 2 + low
- val lowValue = crossValidate((mid - low) / 2 + low)
- val highValue = crossValidate((high - mid) / 2 + mid)
- val (newLow, newHigh) = if (lowValue._2 < highValue._2) {
- (low, mid + (high-low)/4)
- } else {
- (mid - (high-low)/4, high)
- }
- if (newHigh - newLow > 1.0E-7) {
- buffer += lowValue += highValue
- loop(newLow, newHigh)
- } else {
- buffer += lowValue += highValue
- buffer.result()
- }
- }
-
- loop(low, high)
+ LabeledPoint(yNormalized, featuresNormalized.toArray)
}
- // Actually compute the best lambda
- val lambdas = binSearch(lambdaLow, lambdaHigh).sortBy(_._1)
-
- // Find the best parameter set by taking the lowest cverror.
- val (lambdaOpt, cverror, weights) = lambdas.reduce((a, b) => if (a._2 < b._2) a else b)
-
- // Return the model which contains the solution
- val weightsScaled = weights.div(xColSd)
- val intercept = yMean - (weights.transpose().mmul(xColMean.div(xColSd)).get(0))
- val model = new RidgeRegressionModel(weightsScaled, intercept, lambdaOpt, lambdas)
-
- logInfo("RidgeRegression: optimal lambda " + model.lambdaOpt)
- logInfo("RidgeRegression: optimal weights " + model.weights)
- logInfo("RidgeRegression: optimal intercept " + model.intercept)
- logInfo("RidgeRegression: cross-validation error " + cverror)
-
- model
+ super.run(normalizedData, initialWeights)
}
}
/**
- * Top-level methods for calling Ridge Regression.
+ * Top-level methods for calling RidgeRegression.
*/
-object RidgeRegression {
- // NOTE(shivaram): We use multiple train methods instead of default arguments to support
- // Java programs.
+object RidgeRegressionWithSGD {
+
+ /**
+ * Train a RidgeRegression model given an RDD of (label, features) pairs. We run a fixed number
+ * of iterations of gradient descent using the specified step size. Each iteration uses
+ * `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in
+ * gradient descent are initialized using the initial weights provided.
+ *
+ * @param input RDD of (label, array of features) pairs.
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @param stepSize Step size to be used for each iteration of gradient descent.
+ * @param regParam Regularization parameter.
+ * @param miniBatchFraction Fraction of data to be used per iteration.
+ * @param initialWeights Initial set of weights to be used. Array should be equal in size to
+ * the number of features in the data.
+ */
+ def train(
+ input: RDD[LabeledPoint],
+ numIterations: Int,
+ stepSize: Double,
+ regParam: Double,
+ miniBatchFraction: Double,
+ initialWeights: Array[Double])
+ : RidgeRegressionModel =
+ {
+ new RidgeRegressionWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(
+ input, initialWeights)
+ }
+
+ /**
+ * Train a RidgeRegression model given an RDD of (label, features) pairs. We run a fixed number
+ * of iterations of gradient descent using the specified step size. Each iteration uses
+ * `miniBatchFraction` fraction of the data to calculate the gradient.
+ *
+ * @param input RDD of (label, array of features) pairs.
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @param stepSize Step size to be used for each iteration of gradient descent.
+ * @param regParam Regularization parameter.
+ * @param miniBatchFraction Fraction of data to be used per iteration.
+ */
+ def train(
+ input: RDD[LabeledPoint],
+ numIterations: Int,
+ stepSize: Double,
+ regParam: Double,
+ miniBatchFraction: Double)
+ : RidgeRegressionModel =
+ {
+ new RidgeRegressionWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input)
+ }
/**
- * Train a ridge regression model given an RDD of (response, features) pairs.
- * We use the closed form solution to compute the cross-validation score for
- * a given lambda. The optimal lambda is computed by performing binary search
- * between the provided bounds of lambda.
+ * Train a RidgeRegression model given an RDD of (label, features) pairs. We run a fixed number
+ * of iterations of gradient descent using the specified step size. We use the entire data set to
+ * update the gradient in each iteration.
*
- * @param input RDD of (response, array of features) pairs.
- * @param lambdaLow lower bound used in binary search for lambda
- * @param lambdaHigh upper bound used in binary search for lambda
+ * @param input RDD of (label, array of features) pairs.
+ * @param stepSize Step size to be used for each iteration of Gradient Descent.
+ * @param regParam Regularization parameter.
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @return a RidgeRegressionModel which has the weights and offset from training.
*/
def train(
input: RDD[LabeledPoint],
- lambdaLow: Double,
- lambdaHigh: Double)
+ numIterations: Int,
+ stepSize: Double,
+ regParam: Double)
: RidgeRegressionModel =
{
- new RidgeRegression(lambdaLow, lambdaHigh).train(input)
+ train(input, numIterations, stepSize, regParam, 1.0)
}
/**
- * Train a ridge regression model given an RDD of (response, features) pairs.
- * We use the closed form solution to compute the cross-validation score for
- * a given lambda. The optimal lambda is computed by performing binary search
- * between lambda values of 0 and 100.
+ * Train a RidgeRegression model given an RDD of (label, features) pairs. We run a fixed number
+ * of iterations of gradient descent using a step size of 1.0. We use the entire data set to
+ * update the gradient in each iteration.
*
- * @param input RDD of (response, array of features) pairs.
+ * @param input RDD of (label, array of features) pairs.
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @return a RidgeRegressionModel which has the weights and offset from training.
*/
- def train(input: RDD[LabeledPoint]) : RidgeRegressionModel = {
- train(input, 0.0, 100.0)
+ def train(
+ input: RDD[LabeledPoint],
+ numIterations: Int)
+ : RidgeRegressionModel =
+ {
+ train(input, numIterations, 1.0, 1.0, 1.0)
}
def main(args: Array[String]) {
- if (args.length != 2) {
- println("Usage: RidgeRegression <master> <input_dir>")
+ if (args.length != 5) {
+ println("Usage: RidgeRegression <master> <input_dir> <step_size> <regularization_parameter>" +
+ " <niters>")
System.exit(1)
}
val sc = new SparkContext(args(0), "RidgeRegression")
val data = MLUtils.loadLabeledData(sc, args(1))
- val model = RidgeRegression.train(data, 0, 1000)
+ val model = RidgeRegressionWithSGD.train(data, args(4).toInt, args(2).toDouble,
+ args(3).toDouble)
+
sc.stop()
}
}
diff --git a/mllib/src/main/scala/spark/mllib/util/DataValidators.scala b/mllib/src/main/scala/spark/mllib/util/DataValidators.scala
new file mode 100644
index 0000000000..57553accf1
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/util/DataValidators.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.util
+
+import spark.{RDD, Logging}
+import spark.mllib.regression.LabeledPoint
+
+/**
+ * A collection of methods used to validate data before applying ML algorithms.
+ */
+object DataValidators extends Logging {
+
+ /**
+ * Function to check if labels used for classification are either zero or one.
+ *
+ * @param data - input data set that needs to be checked
+ *
+ * @return True if labels are all zero or one, false otherwise.
+ */
+ val classificationLabels: RDD[LabeledPoint] => Boolean = { data =>
+ val numInvalid = data.filter(x => x.label != 1.0 && x.label != 0.0).count()
+ if (numInvalid != 0) {
+ logError("Classification labels should be 0 or 1. Found " + numInvalid + " invalid labels")
+ }
+ numInvalid == 0
+ }
+}
diff --git a/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala
deleted file mode 100644
index eeb14fc4e3..0000000000
--- a/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-package spark.mllib.util
-
-import scala.util.Random
-
-import org.jblas.DoubleMatrix
-
-import spark.{RDD, SparkContext}
-import spark.mllib.regression.LabeledPoint
-
-/**
- * Generate sample data used for Lasso Regression. This class generates uniform random values
- * for the features and adds Gaussian noise with weight 0.1 to generate response variables.
- */
-object LassoDataGenerator {
-
- def main(args: Array[String]) {
- if (args.length < 2) {
- println("Usage: LassoGenerator " +
- "<master> <output_dir> [num_examples] [num_features] [num_partitions]")
- System.exit(1)
- }
-
- val sparkMaster: String = args(0)
- val outputPath: String = args(1)
- val nexamples: Int = if (args.length > 2) args(2).toInt else 1000
- val nfeatures: Int = if (args.length > 3) args(3).toInt else 2
- val parts: Int = if (args.length > 4) args(4).toInt else 2
-
- val sc = new SparkContext(sparkMaster, "LassoGenerator")
-
- val globalRnd = new Random(94720)
- val trueWeights = new DoubleMatrix(1, nfeatures+1,
- Array.fill[Double](nfeatures + 1) { globalRnd.nextGaussian() }:_*)
-
- val data: RDD[LabeledPoint] = sc.parallelize(0 until nexamples, parts).map { idx =>
- val rnd = new Random(42 + idx)
-
- val x = Array.fill[Double](nfeatures) {
- rnd.nextDouble() * 2.0 - 1.0
- }
- val y = (new DoubleMatrix(1, x.length, x:_*)).dot(trueWeights) + rnd.nextGaussian() * 0.1
- LabeledPoint(y, x)
- }
-
- MLUtils.saveLabeledData(data, outputPath)
- sc.stop()
- }
-}
diff --git a/mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/LinearDataGenerator.scala
index 4d329168be..9f48477f84 100644
--- a/mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala
+++ b/mllib/src/main/scala/spark/mllib/util/LinearDataGenerator.scala
@@ -17,66 +17,101 @@
package spark.mllib.util
+import scala.collection.JavaConversions._
import scala.util.Random
import org.jblas.DoubleMatrix
import spark.{RDD, SparkContext}
import spark.mllib.regression.LabeledPoint
+import spark.mllib.regression.LabeledPoint
/**
- * Generate sample data used for RidgeRegression. This class generates
+ * Generate sample data used for Linear Data. This class generates
* uniformly random values for every feature and adds Gaussian noise with mean `eps` to the
* response variable `Y`.
- *
*/
-object RidgeRegressionDataGenerator {
+object LinearDataGenerator {
+
+ /**
+ * Return a Java List of synthetic data randomly generated according to a multi
+ * collinear model.
+ * @param intercept Data intercept
+ * @param weights Weights to be applied.
+ * @param nPoints Number of points in sample.
+ * @param seed Random seed
+ * @return Java List of input.
+ */
+ def generateLinearInputAsList(
+ intercept: Double,
+ weights: Array[Double],
+ nPoints: Int,
+ seed: Int,
+ eps: Double): java.util.List[LabeledPoint] = {
+ seqAsJavaList(generateLinearInput(intercept, weights, nPoints, seed, eps))
+ }
/**
- * Generate an RDD containing sample data for RidgeRegression.
+ *
+ * @param intercept Data intercept
+ * @param weights Weights to be applied.
+ * @param nPoints Number of points in sample.
+ * @param seed Random seed
+ * @param eps Epsilon scaling factor.
+ * @return
+ */
+ def generateLinearInput(
+ intercept: Double,
+ weights: Array[Double],
+ nPoints: Int,
+ seed: Int,
+ eps: Double = 0.1): Seq[LabeledPoint] = {
+
+ val rnd = new Random(seed)
+ val weightsMat = new DoubleMatrix(1, weights.length, weights:_*)
+ val x = Array.fill[Array[Double]](nPoints)(
+ Array.fill[Double](weights.length)(2 * rnd.nextDouble - 1.0))
+ val y = x.map { xi =>
+ (new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) + intercept + eps * rnd.nextGaussian()
+ }
+ y.zip(x).map(p => LabeledPoint(p._1, p._2))
+ }
+
+ /**
+ * Generate an RDD containing sample data for Linear Regression models - including Ridge, Lasso,
+ * and uregularized variants.
*
* @param sc SparkContext to be used for generating the RDD.
* @param nexamples Number of examples that will be contained in the RDD.
* @param nfeatures Number of features to generate for each example.
* @param eps Epsilon factor by which examples are scaled.
+ * @param weights Weights associated with the first weights.length features.
* @param nparts Number of partitions in the RDD. Default value is 2.
*
* @return RDD of LabeledPoint containing sample data.
*/
- def generateRidgeRDD(
- sc: SparkContext,
- nexamples: Int,
- nfeatures: Int,
- eps: Double,
- nparts: Int = 2) : RDD[LabeledPoint] = {
+ def generateLinearRDD(
+ sc: SparkContext,
+ nexamples: Int,
+ nfeatures: Int,
+ eps: Double,
+ nparts: Int = 2,
+ intercept: Double = 0.0) : RDD[LabeledPoint] = {
org.jblas.util.Random.seed(42)
// Random values distributed uniformly in [-0.5, 0.5]
val w = DoubleMatrix.rand(nfeatures, 1).subi(0.5)
- w.put(0, 0, 10)
- w.put(1, 0, 10)
val data: RDD[LabeledPoint] = sc.parallelize(0 until nparts, nparts).flatMap { p =>
- org.jblas.util.Random.seed(42 + p)
+ val seed = 42 + p
val examplesInPartition = nexamples / nparts
-
- val X = DoubleMatrix.rand(examplesInPartition, nfeatures)
- val y = X.mmul(w)
-
- val rnd = new Random(42 + p)
-
- val normalValues = Array.fill[Double](examplesInPartition)(rnd.nextGaussian() * eps)
- val yObs = new DoubleMatrix(normalValues).addi(y)
-
- Iterator.tabulate(examplesInPartition) { i =>
- LabeledPoint(yObs.get(i, 0), X.getRow(i).toArray)
- }
+ generateLinearInput(intercept, w.toArray, examplesInPartition, seed, eps)
}
data
}
def main(args: Array[String]) {
if (args.length < 2) {
- println("Usage: RidgeRegressionGenerator " +
+ println("Usage: LinearDataGenerator " +
"<master> <output_dir> [num_examples] [num_features] [num_partitions]")
System.exit(1)
}
@@ -88,8 +123,8 @@ object RidgeRegressionDataGenerator {
val parts: Int = if (args.length > 4) args(4).toInt else 2
val eps = 10
- val sc = new SparkContext(sparkMaster, "RidgeRegressionDataGenerator")
- val data = generateRidgeRDD(sc, nexamples, nfeatures, eps, parts)
+ val sc = new SparkContext(sparkMaster, "LinearDataGenerator")
+ val data = generateLinearRDD(sc, nexamples, nfeatures, eps, nparts = parts)
MLUtils.saveLabeledData(data, outputPath)
sc.stop()
diff --git a/mllib/src/main/scala/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/spark/mllib/util/MLUtils.scala
index 4e030a81b4..a8e6ae9953 100644
--- a/mllib/src/main/scala/spark/mllib/util/MLUtils.scala
+++ b/mllib/src/main/scala/spark/mllib/util/MLUtils.scala
@@ -72,16 +72,16 @@ object MLUtils {
* xColMean - Row vector with mean for every column (or feature) of the input data
* xColSd - Row vector standard deviation for every column (or feature) of the input data.
*/
- def computeStats(data: RDD[(Double, Array[Double])], nfeatures: Int, nexamples: Long):
+ def computeStats(data: RDD[LabeledPoint], nfeatures: Int, nexamples: Long):
(Double, DoubleMatrix, DoubleMatrix) = {
- val yMean: Double = data.map { case (y, features) => y }.reduce(_ + _) / nexamples
+ val yMean: Double = data.map { labeledPoint => labeledPoint.label }.reduce(_ + _) / nexamples
// NOTE: We shuffle X by column here to compute column sum and sum of squares.
- val xColSumSq: RDD[(Int, (Double, Double))] = data.flatMap { case(y, features) =>
- val nCols = features.length
+ val xColSumSq: RDD[(Int, (Double, Double))] = data.flatMap { labeledPoint =>
+ val nCols = labeledPoint.features.length
// Traverse over every column and emit (col, value, value^2)
Iterator.tabulate(nCols) { i =>
- (i, (features(i), features(i)*features(i)))
+ (i, (labeledPoint.features(i), labeledPoint.features(i)*labeledPoint.features(i)))
}
}.reduceByKey { case(x1, x2) =>
(x1._1 + x2._1, x1._2 + x2._2)
diff --git a/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala
index e02bd190f6..eff456cad6 100644
--- a/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala
+++ b/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala
@@ -1,7 +1,6 @@
package spark.mllib.util
import scala.util.Random
-import scala.math.signum
import spark.{RDD, SparkContext}
@@ -30,8 +29,8 @@ object SVMDataGenerator {
val sc = new SparkContext(sparkMaster, "SVMGenerator")
val globalRnd = new Random(94720)
- val trueWeights = new DoubleMatrix(1, nfeatures+1,
- Array.fill[Double](nfeatures + 1) { globalRnd.nextGaussian() }:_*)
+ val trueWeights = new DoubleMatrix(1, nfeatures + 1,
+ Array.fill[Double](nfeatures + 1)(globalRnd.nextGaussian()):_*)
val data: RDD[LabeledPoint] = sc.parallelize(0 until nexamples, parts).map { idx =>
val rnd = new Random(42 + idx)
@@ -39,11 +38,13 @@ object SVMDataGenerator {
val x = Array.fill[Double](nfeatures) {
rnd.nextDouble() * 2.0 - 1.0
}
- val y = signum((new DoubleMatrix(1, x.length, x:_*)).dot(trueWeights) + rnd.nextGaussian() * 0.1)
+ val yD = (new DoubleMatrix(1, x.length, x:_*)).dot(trueWeights) + rnd.nextGaussian() * 0.1
+ val y = if (yD < 0) 0.0 else 1.0
LabeledPoint(y, x)
}
MLUtils.saveLabeledData(data, outputPath)
+
sc.stop()
}
}
diff --git a/mllib/src/test/java/spark/mllib/regression/JavaLassoSuite.java b/mllib/src/test/java/spark/mllib/regression/JavaLassoSuite.java
index e26d7b385c..5863140baf 100644
--- a/mllib/src/test/java/spark/mllib/regression/JavaLassoSuite.java
+++ b/mllib/src/test/java/spark/mllib/regression/JavaLassoSuite.java
@@ -27,6 +27,7 @@ import org.junit.Test;
import spark.api.java.JavaRDD;
import spark.api.java.JavaSparkContext;
+import spark.mllib.util.LinearDataGenerator;
public class JavaLassoSuite implements Serializable {
private transient JavaSparkContext sc;
@@ -61,16 +62,16 @@ public class JavaLassoSuite implements Serializable {
double A = 2.0;
double[] weights = {-1.5, 1.0e-2};
- JavaRDD<LabeledPoint> testRDD = sc.parallelize(LassoSuite.generateLassoInputAsList(A,
- weights, nPoints, 42), 2).cache();
+ JavaRDD<LabeledPoint> testRDD = sc.parallelize(LinearDataGenerator.generateLinearInputAsList(A,
+ weights, nPoints, 42, 0.1), 2).cache();
List<LabeledPoint> validationData =
- LassoSuite.generateLassoInputAsList(A, weights, nPoints, 17);
+ LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 17, 0.1);
- LassoWithSGD svmSGDImpl = new LassoWithSGD();
- svmSGDImpl.optimizer().setStepSize(1.0)
+ LassoWithSGD lassoSGDImpl = new LassoWithSGD();
+ lassoSGDImpl.optimizer().setStepSize(1.0)
.setRegParam(0.01)
.setNumIterations(20);
- LassoModel model = svmSGDImpl.run(testRDD.rdd());
+ LassoModel model = lassoSGDImpl.run(testRDD.rdd());
int numAccurate = validatePrediction(validationData, model);
Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0);
@@ -82,10 +83,10 @@ public class JavaLassoSuite implements Serializable {
double A = 2.0;
double[] weights = {-1.5, 1.0e-2};
- JavaRDD<LabeledPoint> testRDD = sc.parallelize(LassoSuite.generateLassoInputAsList(A,
- weights, nPoints, 42), 2).cache();
+ JavaRDD<LabeledPoint> testRDD = sc.parallelize(LinearDataGenerator.generateLinearInputAsList(A,
+ weights, nPoints, 42, 0.1), 2).cache();
List<LabeledPoint> validationData =
- LassoSuite.generateLassoInputAsList(A, weights, nPoints, 17);
+ LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 17, 0.1);
LassoModel model = LassoWithSGD.train(testRDD.rdd(), 100, 1.0, 0.01, 1.0);
diff --git a/mllib/src/test/java/spark/mllib/regression/JavaLinearRegressionSuite.java b/mllib/src/test/java/spark/mllib/regression/JavaLinearRegressionSuite.java
new file mode 100644
index 0000000000..50716c7861
--- /dev/null
+++ b/mllib/src/test/java/spark/mllib/regression/JavaLinearRegressionSuite.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.regression;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import spark.api.java.JavaRDD;
+import spark.api.java.JavaSparkContext;
+import spark.mllib.util.LinearDataGenerator;
+
+public class JavaLinearRegressionSuite implements Serializable {
+ private transient JavaSparkContext sc;
+
+ @Before
+ public void setUp() {
+ sc = new JavaSparkContext("local", "JavaLinearRegressionSuite");
+ }
+
+ @After
+ public void tearDown() {
+ sc.stop();
+ sc = null;
+ System.clearProperty("spark.driver.port");
+ }
+
+ int validatePrediction(List<LabeledPoint> validationData, LinearRegressionModel model) {
+ int numAccurate = 0;
+ for (LabeledPoint point: validationData) {
+ Double prediction = model.predict(point.features());
+ // A prediction is off if the prediction is more than 0.5 away from expected value.
+ if (Math.abs(prediction - point.label()) <= 0.5) {
+ numAccurate++;
+ }
+ }
+ return numAccurate;
+ }
+
+ @Test
+ public void runLinearRegressionUsingConstructor() {
+ int nPoints = 100;
+ double A = 3.0;
+ double[] weights = {10, 10};
+
+ JavaRDD<LabeledPoint> testRDD = sc.parallelize(
+ LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 42, 0.1), 2).cache();
+ List<LabeledPoint> validationData =
+ LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 17, 0.1);
+
+ LinearRegressionWithSGD linSGDImpl = new LinearRegressionWithSGD();
+ LinearRegressionModel model = linSGDImpl.run(testRDD.rdd());
+
+ int numAccurate = validatePrediction(validationData, model);
+ Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0);
+ }
+
+ @Test
+ public void runLinearRegressionUsingStaticMethods() {
+ int nPoints = 100;
+ double A = 3.0;
+ double[] weights = {10, 10};
+
+ JavaRDD<LabeledPoint> testRDD = sc.parallelize(
+ LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 42, 0.1), 2).cache();
+ List<LabeledPoint> validationData =
+ LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 17, 0.1);
+
+ LinearRegressionModel model = LinearRegressionWithSGD.train(testRDD.rdd(), 100);
+
+ int numAccurate = validatePrediction(validationData, model);
+ Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0);
+ }
+
+}
diff --git a/mllib/src/test/java/spark/mllib/regression/JavaRidgeRegressionSuite.java b/mllib/src/test/java/spark/mllib/regression/JavaRidgeRegressionSuite.java
new file mode 100644
index 0000000000..2c0aabad30
--- /dev/null
+++ b/mllib/src/test/java/spark/mllib/regression/JavaRidgeRegressionSuite.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.regression;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.jblas.DoubleMatrix;
+
+import spark.api.java.JavaRDD;
+import spark.api.java.JavaSparkContext;
+import spark.mllib.util.LinearDataGenerator;
+
+public class JavaRidgeRegressionSuite implements Serializable {
+ private transient JavaSparkContext sc;
+
+ @Before
+ public void setUp() {
+ sc = new JavaSparkContext("local", "JavaRidgeRegressionSuite");
+ }
+
+ @After
+ public void tearDown() {
+ sc.stop();
+ sc = null;
+ System.clearProperty("spark.driver.port");
+ }
+
+ double predictionError(List<LabeledPoint> validationData, RidgeRegressionModel model) {
+ double errorSum = 0;
+ for (LabeledPoint point: validationData) {
+ Double prediction = model.predict(point.features());
+ errorSum += (prediction - point.label()) * (prediction - point.label());
+ }
+ return errorSum / validationData.size();
+ }
+
+ List<LabeledPoint> generateRidgeData(int numPoints, int nfeatures, double eps) {
+ org.jblas.util.Random.seed(42);
+ // Pick weights as random values distributed uniformly in [-0.5, 0.5]
+ DoubleMatrix w = DoubleMatrix.rand(nfeatures, 1).subi(0.5);
+ // Set first two weights to eps
+ w.put(0, 0, eps);
+ w.put(1, 0, eps);
+ return LinearDataGenerator.generateLinearInputAsList(0.0, w.data, numPoints, 42, eps);
+ }
+
+ @Test
+ public void runRidgeRegressionUsingConstructor() {
+ int nexamples = 200;
+ int nfeatures = 20;
+ double eps = 10.0;
+ List<LabeledPoint> data = generateRidgeData(2*nexamples, nfeatures, eps);
+
+ JavaRDD<LabeledPoint> testRDD = sc.parallelize(data.subList(0, nexamples));
+ List<LabeledPoint> validationData = data.subList(nexamples, 2*nexamples);
+
+ RidgeRegressionWithSGD ridgeSGDImpl = new RidgeRegressionWithSGD();
+ ridgeSGDImpl.optimizer().setStepSize(1.0)
+ .setRegParam(0.0)
+ .setNumIterations(200);
+ RidgeRegressionModel model = ridgeSGDImpl.run(testRDD.rdd());
+ double unRegularizedErr = predictionError(validationData, model);
+
+ ridgeSGDImpl.optimizer().setRegParam(0.1);
+ model = ridgeSGDImpl.run(testRDD.rdd());
+ double regularizedErr = predictionError(validationData, model);
+
+ Assert.assertTrue(regularizedErr < unRegularizedErr);
+ }
+
+ @Test
+ public void runRidgeRegressionUsingStaticMethods() {
+ int nexamples = 200;
+ int nfeatures = 20;
+ double eps = 10.0;
+ List<LabeledPoint> data = generateRidgeData(2*nexamples, nfeatures, eps);
+
+ JavaRDD<LabeledPoint> testRDD = sc.parallelize(data.subList(0, nexamples));
+ List<LabeledPoint> validationData = data.subList(nexamples, 2*nexamples);
+
+ RidgeRegressionModel model = RidgeRegressionWithSGD.train(testRDD.rdd(), 200, 1.0, 0.0);
+ double unRegularizedErr = predictionError(validationData, model);
+
+ model = RidgeRegressionWithSGD.train(testRDD.rdd(), 200, 1.0, 0.1);
+ double regularizedErr = predictionError(validationData, model);
+
+ Assert.assertTrue(regularizedErr < unRegularizedErr);
+ }
+}
diff --git a/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala b/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala
index 04f631d80f..894ae458ad 100644
--- a/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala
+++ b/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala
@@ -48,13 +48,11 @@ object SVMSuite {
val rnd = new Random(seed)
val weightsMat = new DoubleMatrix(1, weights.length, weights:_*)
val x = Array.fill[Array[Double]](nPoints)(
- Array.fill[Double](weights.length)(rnd.nextGaussian()))
+ Array.fill[Double](weights.length)(rnd.nextDouble() * 2.0 - 1.0))
val y = x.map { xi =>
- signum(
- (new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) +
- intercept +
- 0.1 * rnd.nextGaussian()
- ).toInt
+ val yD = (new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) +
+ intercept + 0.01 * rnd.nextGaussian()
+ if (yD < 0) 0.0 else 1.0
}
y.zip(x).map(p => LabeledPoint(p._1, p._2))
}
@@ -85,7 +83,8 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll {
test("SVM using local random SGD") {
val nPoints = 10000
- val A = 2.0
+ // NOTE: Intercept should be small for generating equal 0s and 1s
+ val A = 0.01
val B = -1.5
val C = 1.0
@@ -100,7 +99,7 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll {
val model = svm.run(testRDD)
val validationData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 17)
- val validationRDD = sc.parallelize(validationData,2)
+ val validationRDD = sc.parallelize(validationData, 2)
// Test prediction on RDD.
validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData)
@@ -112,7 +111,8 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll {
test("SVM local random SGD with initial weights") {
val nPoints = 10000
- val A = 2.0
+ // NOTE: Intercept should be small for generating equal 0s and 1s
+ val A = 0.01
val B = -1.5
val C = 1.0
@@ -139,4 +139,31 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll {
// Test prediction on Array.
validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
}
+
+ test("SVM with invalid labels") {
+ val nPoints = 10000
+
+ // NOTE: Intercept should be small for generating equal 0s and 1s
+ val A = 0.01
+ val B = -1.5
+ val C = 1.0
+
+ val testData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 42)
+ val testRDD = sc.parallelize(testData, 2)
+
+ val testRDDInvalid = testRDD.map { lp =>
+ if (lp.label == 0.0) {
+ LabeledPoint(-1.0, lp.features)
+ } else {
+ lp
+ }
+ }
+
+ intercept[spark.SparkException] {
+ val model = SVMWithSGD.train(testRDDInvalid, 100)
+ }
+
+ // Turning off data validation should not throw an exception
+ val noValidationModel = new SVMWithSGD().setValidateData(false).run(testRDDInvalid)
+ }
}
diff --git a/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala b/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala
index 55a738f1e4..622dbbab7f 100644
--- a/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala
+++ b/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala
@@ -24,37 +24,8 @@ import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite
import spark.SparkContext
+import spark.mllib.util.LinearDataGenerator
-import org.jblas.DoubleMatrix
-
-object LassoSuite {
-
- def generateLassoInputAsList(
- intercept: Double,
- weights: Array[Double],
- nPoints: Int,
- seed: Int): java.util.List[LabeledPoint] = {
- seqAsJavaList(generateLassoInput(intercept, weights, nPoints, seed))
- }
-
-
- // Generate noisy input of the form Y = x.dot(weights) + intercept + noise
- def generateLassoInput(
- intercept: Double,
- weights: Array[Double],
- nPoints: Int,
- seed: Int): Seq[LabeledPoint] = {
- val rnd = new Random(seed)
- val weightsMat = new DoubleMatrix(1, weights.length, weights:_*)
- val x = Array.fill[Array[Double]](nPoints)(
- Array.fill[Double](weights.length)(rnd.nextGaussian()))
- val y = x.map(xi =>
- (new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) + intercept + 0.1 * rnd.nextGaussian()
- )
- y.zip(x).map(p => LabeledPoint(p._1, p._2))
- }
-
-}
class LassoSuite extends FunSuite with BeforeAndAfterAll {
@transient private var sc: SparkContext = _
@@ -85,7 +56,7 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll {
val B = -1.5
val C = 1.0e-2
- val testData = LassoSuite.generateLassoInput(A, Array[Double](B,C), nPoints, 42)
+ val testData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 42)
val testRDD = sc.parallelize(testData, 2)
testRDD.cache()
@@ -101,7 +72,7 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll {
assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]")
assert(weight1 >= -1.0e-3 && weight1 <= 1.0e-3, weight1 + " not in [-0.001, 0.001]")
- val validationData = LassoSuite.generateLassoInput(A, Array[Double](B,C), nPoints, 17)
+ val validationData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 17)
val validationRDD = sc.parallelize(validationData, 2)
// Test prediction on RDD.
@@ -118,7 +89,7 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll {
val B = -1.5
val C = 1.0e-2
- val testData = LassoSuite.generateLassoInput(A, Array[Double](B,C), nPoints, 42)
+ val testData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 42)
val initialB = -1.0
val initialC = -1.0
@@ -138,7 +109,7 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll {
assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]")
assert(weight1 >= -1.0e-3 && weight1 <= 1.0e-3, weight1 + " not in [-0.001, 0.001]")
- val validationData = LassoSuite.generateLassoInput(A, Array[Double](B,C), nPoints, 17)
+ val validationData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 17)
val validationRDD = sc.parallelize(validationData,2)
// Test prediction on RDD.
diff --git a/mllib/src/test/scala/spark/mllib/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/spark/mllib/regression/LinearRegressionSuite.scala
new file mode 100644
index 0000000000..acc48a3283
--- /dev/null
+++ b/mllib/src/test/scala/spark/mllib/regression/LinearRegressionSuite.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.regression
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.FunSuite
+
+import spark.SparkContext
+import spark.SparkContext._
+import spark.mllib.util.LinearDataGenerator
+
+class LinearRegressionSuite extends FunSuite with BeforeAndAfterAll {
+ @transient private var sc: SparkContext = _
+
+ override def beforeAll() {
+ sc = new SparkContext("local", "test")
+ }
+
+ override def afterAll() {
+ sc.stop()
+ System.clearProperty("spark.driver.port")
+ }
+
+ def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) {
+ val numOffPredictions = predictions.zip(input).filter { case (prediction, expected) =>
+ // A prediction is off if the prediction is more than 0.5 away from expected value.
+ math.abs(prediction - expected.label) > 0.5
+ }.size
+ // At least 80% of the predictions should be on.
+ assert(numOffPredictions < input.length / 5)
+ }
+
+ // Test if we can correctly learn Y = 3 + 10*X1 + 10*X2
+ test("linear regression") {
+ val testRDD = sc.parallelize(LinearDataGenerator.generateLinearInput(
+ 3.0, Array(10.0, 10.0), 100, 42), 2).cache()
+ val linReg = new LinearRegressionWithSGD()
+ linReg.optimizer.setNumIterations(1000).setStepSize(1.0)
+
+ val model = linReg.run(testRDD)
+
+ assert(model.intercept >= 2.5 && model.intercept <= 3.5)
+ assert(model.weights.length === 2)
+ assert(model.weights(0) >= 9.0 && model.weights(0) <= 11.0)
+ assert(model.weights(1) >= 9.0 && model.weights(1) <= 11.0)
+
+ val validationData = LinearDataGenerator.generateLinearInput(
+ 3.0, Array(10.0, 10.0), 100, 17)
+ val validationRDD = sc.parallelize(validationData, 2).cache()
+
+ // Test prediction on RDD.
+ validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData)
+
+ // Test prediction on Array.
+ validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
+ }
+}
diff --git a/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala b/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala
index e2b244894d..c482035706 100644
--- a/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala
+++ b/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala
@@ -17,14 +17,16 @@
package spark.mllib.regression
+import scala.collection.JavaConversions._
import scala.util.Random
+import org.jblas.DoubleMatrix
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite
import spark.SparkContext
import spark.SparkContext._
-
+import spark.mllib.util.LinearDataGenerator
class RidgeRegressionSuite extends FunSuite with BeforeAndAfterAll {
@transient private var sc: SparkContext = _
@@ -38,31 +40,51 @@ class RidgeRegressionSuite extends FunSuite with BeforeAndAfterAll {
System.clearProperty("spark.driver.port")
}
- // Test if we can correctly learn Y = 3 + X1 + X2 when
- // X1 and X2 are collinear.
- test("multi-collinear variables") {
- val rnd = new Random(43)
- val x1 = Array.fill[Double](20)(rnd.nextGaussian())
+ def predictionError(predictions: Seq[Double], input: Seq[LabeledPoint]) = {
+ predictions.zip(input).map { case (prediction, expected) =>
+ (prediction - expected.label) * (prediction - expected.label)
+ }.reduceLeft(_ + _) / predictions.size
+ }
+
+ test("regularization with skewed weights") {
+ val nexamples = 200
+ val nfeatures = 20
+ val eps = 10
+
+ org.jblas.util.Random.seed(42)
+ // Pick weights as random values distributed uniformly in [-0.5, 0.5]
+ val w = DoubleMatrix.rand(nfeatures, 1).subi(0.5)
+ // Set first two weights to eps
+ w.put(0, 0, eps)
+ w.put(1, 0, eps)
- // Pick a mean close to mean of x1
- val rnd1 = new Random(42) //new NormalDistribution(0.1, 0.01)
- val x2 = Array.fill[Double](20)(0.1 + rnd1.nextGaussian() * 0.01)
+ // Use half of data for training and other half for validation
+ val data = LinearDataGenerator.generateLinearInput(3.0, w.toArray, 2*nexamples, 42, eps)
+ val testData = data.take(nexamples)
+ val validationData = data.takeRight(nexamples)
- val xMat = (0 until 20).map(i => Array(x1(i), x2(i))).toArray
+ val testRDD = sc.parallelize(testData, 2).cache()
+ val validationRDD = sc.parallelize(validationData, 2).cache()
- val y = xMat.map(i => 3 + i(0) + i(1))
- val testData = (0 until 20).map(i => LabeledPoint(y(i), xMat(i))).toArray
+ // First run without regularization.
+ val linearReg = new LinearRegressionWithSGD()
+ linearReg.optimizer.setNumIterations(200)
+ .setStepSize(1.0)
- val testRDD = sc.parallelize(testData, 2)
- testRDD.cache()
- val ridgeReg = new RidgeRegression().setLowLambda(0)
- .setHighLambda(10)
+ val linearModel = linearReg.run(testRDD)
+ val linearErr = predictionError(
+ linearModel.predict(validationRDD.map(_.features)).collect(), validationData)
- val model = ridgeReg.train(testRDD)
+ val ridgeReg = new RidgeRegressionWithSGD()
+ ridgeReg.optimizer.setNumIterations(200)
+ .setRegParam(0.1)
+ .setStepSize(1.0)
+ val ridgeModel = ridgeReg.run(testRDD)
+ val ridgeErr = predictionError(
+ ridgeModel.predict(validationRDD.map(_.features)).collect(), validationData)
- assert(model.intercept >= 2.9 && model.intercept <= 3.1)
- assert(model.weights.length === 2)
- assert(model.weights.get(0) >= 0.9 && model.weights.get(0) <= 1.1)
- assert(model.weights.get(1) >= 0.9 && model.weights.get(1) <= 1.1)
+ // Ridge CV-error should be lower than linear regression
+ assert(ridgeErr < linearErr,
+ "ridgeError (" + ridgeErr + ") was not less than linearError(" + linearErr + ")")
}
}
diff --git a/pom.xml b/pom.xml
index 85bcd8696c..e2fd54a966 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,6 +62,7 @@
<module>tools</module>
<module>streaming</module>
<module>repl</module>
+ <module>assembly</module>
</modules>
<properties>
@@ -75,7 +76,7 @@
<slf4j.version>1.7.2</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<hadoop.version>1.0.4</hadoop.version>
- <!-- <hadoop.version>2.0.0-mr1-cdh4.1.2</hadoop.version> -->
+ <hbase.version>0.94.6</hbase.version>
<PermGen>64m</PermGen>
<MaxPermGen>512m</MaxPermGen>
@@ -492,7 +493,7 @@
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>${project.build.directory}/SparkTestSuite.txt</filereports>
- <argLine>-Xms64m -Xmx1024m</argLine>
+ <argLine>-Xms64m -Xmx3g</argLine>
<stderr/>
</configuration>
<executions>
@@ -743,22 +744,11 @@
</dependencyManagement>
</profile>
<profile>
- <id>assembly</id>
+ <id>repl-bin</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<modules>
- <module>assembly</module>
- </modules>
- </profile>
- <profile>
- <id>expensive-modules</id>
- <activation>
- <property>
- <name>!noExpensive</name>
- </property>
- </activation>
- <modules>
<module>repl-bin</module>
</modules>
</profile>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 5fdcf19b62..2e26812671 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -26,30 +26,35 @@ import AssemblyKeys._
object SparkBuild extends Build {
// Hadoop version to build against. For example, "1.0.4" for Apache releases, or
// "2.0.0-mr1-cdh4.2.0" for Cloudera Hadoop. Note that these variables can be set
- // through the environment variables SPARK_HADOOP_VERSION and SPARK_WITH_YARN.
+ // through the environment variables SPARK_HADOOP_VERSION and SPARK_YARN.
val DEFAULT_HADOOP_VERSION = "1.0.4"
- val DEFAULT_WITH_YARN = false
+ val DEFAULT_YARN = false
// HBase version; set as appropriate.
val HBASE_VERSION = "0.94.6"
- lazy val root = Project("root", file("."), settings = rootSettings) aggregate(allProjects:_*)
+ lazy val root = Project("root", file("."), settings = rootSettings) aggregate(allProjects: _*)
lazy val core = Project("core", file("core"), settings = coreSettings)
- lazy val repl = Project("repl", file("repl"), settings = replSettings) dependsOn(core) dependsOn(bagel) dependsOn(mllib) dependsOn(maybeYarn:_*)
+ lazy val repl = Project("repl", file("repl"), settings = replSettings)
+ .dependsOn(core, bagel, mllib) dependsOn(maybeYarn: _*)
- lazy val examples = Project("examples", file("examples"), settings = examplesSettings) dependsOn (core) dependsOn (streaming) dependsOn(mllib)
+ lazy val examples = Project("examples", file("examples"), settings = examplesSettings)
+ .dependsOn(core, mllib, bagel, streaming) dependsOn(maybeYarn: _*)
- lazy val tools = Project("tools", file("tools"), settings = examplesSettings) dependsOn (core) dependsOn (streaming)
+ lazy val tools = Project("tools", file("tools"), settings = toolsSettings) dependsOn(core) dependsOn(streaming)
- lazy val bagel = Project("bagel", file("bagel"), settings = bagelSettings) dependsOn (core)
+ lazy val bagel = Project("bagel", file("bagel"), settings = bagelSettings) dependsOn(core)
- lazy val streaming = Project("streaming", file("streaming"), settings = streamingSettings) dependsOn (core)
+ lazy val streaming = Project("streaming", file("streaming"), settings = streamingSettings) dependsOn(core)
- lazy val mllib = Project("mllib", file("mllib"), settings = mllibSettings) dependsOn (core)
+ lazy val mllib = Project("mllib", file("mllib"), settings = mllibSettings) dependsOn(core)
- lazy val yarn = Project("yarn", file("yarn"), settings = yarnSettings) dependsOn (core)
+ lazy val yarn = Project("yarn", file("yarn"), settings = yarnSettings) dependsOn(core)
+
+ lazy val assemblyProj = Project("assembly", file("assembly"), settings = assemblyProjSettings)
+ .dependsOn(core, bagel, mllib, repl, streaming) dependsOn(maybeYarn: _*)
// A configuration to set an alternative publishLocalConfiguration
lazy val MavenCompile = config("m2r") extend(Compile)
@@ -57,15 +62,16 @@ object SparkBuild extends Build {
// Allows build configuration to be set through environment variables
lazy val hadoopVersion = scala.util.Properties.envOrElse("SPARK_HADOOP_VERSION", DEFAULT_HADOOP_VERSION)
- lazy val isYarnEnabled = scala.util.Properties.envOrNone("SPARK_WITH_YARN") match {
- case None => DEFAULT_WITH_YARN
+ lazy val isYarnEnabled = scala.util.Properties.envOrNone("SPARK_YARN") match {
+ case None => DEFAULT_YARN
case Some(v) => v.toBoolean
}
// Conditionally include the yarn sub-project
lazy val maybeYarn = if(isYarnEnabled) Seq[ClasspathDependency](yarn) else Seq[ClasspathDependency]()
lazy val maybeYarnRef = if(isYarnEnabled) Seq[ProjectReference](yarn) else Seq[ProjectReference]()
- lazy val allProjects = Seq[ProjectReference](core, repl, examples, bagel, streaming, mllib, tools) ++ maybeYarnRef
+ lazy val allProjects = Seq[ProjectReference](
+ core, repl, examples, bagel, streaming, mllib, tools, assemblyProj) ++ maybeYarnRef
def sharedSettings = Defaults.defaultSettings ++ Seq(
organization := "org.spark-project",
@@ -80,7 +86,7 @@ object SparkBuild extends Build {
// Fork new JVMs for tests and set Java options for those
fork := true,
- javaOptions += "-Xmx2500m",
+ javaOptions += "-Xmx3g",
// Only allow one test at a time, even across projects, since they run in the same JVM
concurrentRestrictions in Global += Tags.limit(Tags.Test, 1),
@@ -100,8 +106,8 @@ object SparkBuild extends Build {
<url>http://spark-project.org/</url>
<licenses>
<license>
- <name>BSD License</name>
- <url>https://github.com/mesos/spark/blob/master/LICENSE</url>
+ <name>Apache 2.0 License</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.html</url>
<distribution>repo</distribution>
</license>
</licenses>
@@ -195,7 +201,7 @@ object SparkBuild extends Build {
"com.twitter" % "chill_2.9.3" % "0.3.1",
"com.twitter" % "chill-java" % "0.3.1"
)
- ) ++ assemblySettings ++ extraAssemblySettings
+ )
def rootSettings = sharedSettings ++ Seq(
publish := {}
@@ -204,7 +210,7 @@ object SparkBuild extends Build {
def replSettings = sharedSettings ++ Seq(
name := "spark-repl",
libraryDependencies <+= scalaVersion("org.scala-lang" % "scala-compiler" % _)
- ) ++ assemblySettings ++ extraAssemblySettings
+ )
def examplesSettings = sharedSettings ++ Seq(
name := "spark-examples",
@@ -223,7 +229,7 @@ object SparkBuild extends Build {
exclude("org.apache.cassandra.deps", "avro")
excludeAll(excludeSnappy)
)
- )
+ ) ++ assemblySettings ++ extraAssemblySettings
def toolsSettings = sharedSettings ++ Seq(
name := "spark-tools"
@@ -251,11 +257,11 @@ object SparkBuild extends Build {
"org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty),
"com.typesafe.akka" % "akka-zeromq" % "2.0.5" excludeAll(excludeNetty)
)
- ) ++ assemblySettings ++ extraAssemblySettings
+ )
def yarnSettings = sharedSettings ++ Seq(
name := "spark-yarn"
- ) ++ extraYarnSettings ++ assemblySettings ++ extraAssemblySettings
+ ) ++ extraYarnSettings
// Conditionally include the YARN dependencies because some tools look at all sub-projects and will complain
// if we refer to nonexistent dependencies (e.g. hadoop-yarn-api from a Hadoop version without YARN).
@@ -271,7 +277,13 @@ object SparkBuild extends Build {
)
)
- def extraAssemblySettings() = Seq(test in assembly := {}) ++ Seq(
+ def assemblyProjSettings = sharedSettings ++ Seq(
+ name := "spark-assembly",
+ jarName in assembly <<= version map { v => "spark-assembly-" + v + "-hadoop" + hadoopVersion + ".jar" }
+ ) ++ assemblySettings ++ extraAssemblySettings
+
+ def extraAssemblySettings() = Seq(
+ test in assembly := {},
mergeStrategy in assembly := {
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard
diff --git a/project/build.properties b/project/build.properties
index 08e17131f6..9647277162 100644
--- a/project/build.properties
+++ b/project/build.properties
@@ -15,4 +15,4 @@
# limitations under the License.
#
-sbt.version=0.12.3
+sbt.version=0.12.4
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 783b40d4f5..cfcd85082a 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -4,7 +4,7 @@ resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/release
resolvers += "Spray Repository" at "http://repo.spray.cc/"
-addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.5")
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.9.1")
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.2.0")
diff --git a/pyspark b/pyspark
index 801239c108..2dba2ceb21 100755
--- a/pyspark
+++ b/pyspark
@@ -23,11 +23,17 @@ FWDIR="$(cd `dirname $0`; pwd)"
# Export this as SPARK_HOME
export SPARK_HOME="$FWDIR"
+SCALA_VERSION=2.9.3
+
# Exit if the user hasn't compiled Spark
-if [ ! -e "$SPARK_HOME/repl/target" ]; then
- echo "Failed to find Spark classes in $SPARK_HOME/repl/target" >&2
- echo "You need to compile Spark before running this program" >&2
- exit 1
+if [ ! -f "$FWDIR/RELEASE" ]; then
+ # Exit if the user hasn't compiled Spark
+ ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/spark-assembly*hadoop*.jar >& /dev/null
+ if [[ $? != 0 ]]; then
+ echo "Failed to find Spark assembly in $FWDIR/assembly/target" >&2
+ echo "You need to compile Spark before running this program" >&2
+ exit 1
+ fi
fi
# Load environment variables from conf/spark-env.sh, if it exists
@@ -48,11 +54,6 @@ export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
export OLD_PYTHONSTARTUP=$PYTHONSTARTUP
export PYTHONSTARTUP=$FWDIR/python/pyspark/shell.py
-# Launch with `scala` by default:
-if [[ "$SPARK_LAUNCH_WITH_SCALA" != "0" ]] ; then
- export SPARK_LAUNCH_WITH_SCALA=1
-fi
-
if [ -n "$IPYTHON_OPTS" ]; then
IPYTHON=1
fi
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index fdc9851479..3ccf062c86 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -29,7 +29,7 @@ SPARK_HOME = os.environ["SPARK_HOME"]
def launch_gateway():
# Launch the Py4j gateway using Spark's run command so that we pick up the
# proper classpath and SPARK_MEM settings from spark-env.sh
- command = [os.path.join(SPARK_HOME, "run"), "py4j.GatewayServer",
+ command = [os.path.join(SPARK_HOME, "spark-class"), "py4j.GatewayServer",
"--die-on-broken-pipe", "0"]
# Don't send ctrl-c / SIGINT to the Java gateway:
def preexec_function():
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 1e9b3bb5c0..dfc518a7b0 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -754,6 +754,43 @@ class RDD(object):
"""
return python_cogroup(self, other, numPartitions)
+ def subtractByKey(self, other, numPartitions=None):
+ """
+ Return each (key, value) pair in C{self} that has no pair with matching key
+ in C{other}.
+
+ >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)])
+ >>> y = sc.parallelize([("a", 3), ("c", None)])
+ >>> sorted(x.subtractByKey(y).collect())
+ [('b', 4), ('b', 5)]
+ """
+ filter_func = lambda tpl: len(tpl[1][0]) > 0 and len(tpl[1][1]) == 0
+ map_func = lambda tpl: [(tpl[0], val) for val in tpl[1][0]]
+ return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func)
+
+ def subtract(self, other, numPartitions=None):
+ """
+ Return each value in C{self} that is not contained in C{other}.
+
+ >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
+ >>> y = sc.parallelize([("a", 3), ("c", None)])
+ >>> sorted(x.subtract(y).collect())
+ [('a', 1), ('b', 4), ('b', 5)]
+ """
+ rdd = other.map(lambda x: (x, True)) # note: here 'True' is just a placeholder
+ return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0]) # note: here 'True' is just a placeholder
+
+ def keyBy(self, f):
+ """
+ Creates tuples of the elements in this RDD by applying C{f}.
+
+ >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x)
+ >>> y = sc.parallelize(zip(range(0,5), range(0,5)))
+ >>> sorted(x.cogroup(y).collect())
+ [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))]
+ """
+ return self.map(lambda x: (f(x), x))
+
# TODO: `lookup` is disabled because we can't make direct comparisons based
# on the key; we need to compare the hash of the key to the hash of the
# keys in the pairs. This could be an expensive operation, since those
diff --git a/run-example b/run-example
new file mode 100755
index 0000000000..ccd4356bdf
--- /dev/null
+++ b/run-example
@@ -0,0 +1,81 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+SCALA_VERSION=2.9.3
+
+# Figure out where the Scala framework is installed
+FWDIR="$(cd `dirname $0`; pwd)"
+
+# Export this as SPARK_HOME
+export SPARK_HOME="$FWDIR"
+
+# Load environment variables from conf/spark-env.sh, if it exists
+if [ -e $FWDIR/conf/spark-env.sh ] ; then
+ . $FWDIR/conf/spark-env.sh
+fi
+
+if [ -z "$1" ]; then
+ echo "Usage: run-example <example-class> [<args>]" >&2
+ exit 1
+fi
+
+# Figure out the JAR file that our examples were packaged into. This includes a bit of a hack
+# to avoid the -sources and -doc packages that are built by publish-local.
+EXAMPLES_DIR="$FWDIR"/examples
+SPARK_EXAMPLES_JAR=""
+if [ -e "$EXAMPLES_DIR"/target/scala-$SCALA_VERSION/*assembly*[0-9T].jar ]; then
+ # Use the JAR from the SBT build
+ export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR"/target/scala-$SCALA_VERSION/*assembly*[0-9T].jar`
+fi
+if [ -e "$EXAMPLES_DIR"/target/spark-examples*[0-9T].jar ]; then
+ # Use the JAR from the Maven build
+ # TODO: this also needs to become an assembly!
+ export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR"/target/spark-examples*[0-9T].jar`
+fi
+if [[ -z $SPARK_EXAMPLES_JAR ]]; then
+ echo "Failed to find Spark examples assembly in $FWDIR/examples/target" >&2
+ echo "You need to compile Spark before running this program" >&2
+ exit 1
+fi
+
+# Since the examples JAR ideally shouldn't include spark-core (that dependency should be
+# "provided"), also add our standard Spark classpath, built using compute-classpath.sh.
+CLASSPATH=`$FWDIR/bin/compute-classpath.sh`
+CLASSPATH="$SPARK_EXAMPLES_JAR:$CLASSPATH"
+
+# Find java binary
+if [ -n "${JAVA_HOME}" ]; then
+ RUNNER="${JAVA_HOME}/bin/java"
+else
+ if [ `command -v java` ]; then
+ RUNNER="java"
+ else
+ echo "JAVA_HOME is not set" >&2
+ exit 1
+ fi
+fi
+
+if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then
+ echo -n "Spark Command: "
+ echo "$RUNNER" -cp "$CLASSPATH" "$@"
+ echo "========================================"
+ echo
+fi
+
+exec "$RUNNER" -cp "$CLASSPATH" "$@"
diff --git a/sbt/sbt b/sbt/sbt
index 2227bc4696..c31a0280ff 100755
--- a/sbt/sbt
+++ b/sbt/sbt
@@ -25,4 +25,4 @@ fi
export SPARK_HOME=$(cd "$(dirname $0)/.." 2>&1 >/dev/null ; pwd)
export SPARK_TESTING=1 # To put test classes on classpath
-java -Xmx1200m -XX:MaxPermSize=350m -XX:ReservedCodeCacheSize=256m $EXTRA_ARGS $SBT_OPTS -jar $SPARK_HOME/sbt/sbt-launch-*.jar "$@"
+java -Xmx1200m -XX:MaxPermSize=350m -XX:ReservedCodeCacheSize=256m $EXTRA_ARGS $SBT_OPTS -jar "$SPARK_HOME"/sbt/sbt-launch-*.jar "$@"
diff --git a/run b/spark-class
index 3868332c90..5ef3de9773 100755
--- a/run
+++ b/spark-class
@@ -66,51 +66,27 @@ case "$1" in
;;
esac
-# Figure out whether to run our class with java or with the scala launcher.
-# In most cases, we'd prefer to execute our process with java because scala
-# creates a shell script as the parent of its Java process, which makes it
-# hard to kill the child with stuff like Process.destroy(). However, for
-# the Spark shell, the wrapper is necessary to properly reset the terminal
-# when we exit, so we allow it to set a variable to launch with scala.
-# We still fall back on java for the shell if this is a "release" created
-# from make-distribution.sh since it's possible scala is not installed
-# but we have everything we need to run the shell.
-if [[ "$SPARK_LAUNCH_WITH_SCALA" == "1" && ! -f "$FWDIR/RELEASE" ]]; then
- if [ "$SCALA_HOME" ]; then
- RUNNER="${SCALA_HOME}/bin/scala"
- else
- if [ `command -v scala` ]; then
- RUNNER="scala"
- else
- echo "SCALA_HOME is not set and scala is not in PATH" >&2
- exit 1
- fi
- fi
+# Find the java binary
+if [ -n "${JAVA_HOME}" ]; then
+ RUNNER="${JAVA_HOME}/bin/java"
else
- if [ -n "${JAVA_HOME}" ]; then
- RUNNER="${JAVA_HOME}/bin/java"
+ if [ `command -v java` ]; then
+ RUNNER="java"
else
- if [ `command -v java` ]; then
- RUNNER="java"
- else
- echo "JAVA_HOME is not set" >&2
- exit 1
- fi
+ echo "JAVA_HOME is not set" >&2
+ exit 1
fi
- if [[ ! -f "$FWDIR/RELEASE" && -z "$SCALA_LIBRARY_PATH" ]]; then
- if [ -z "$SCALA_HOME" ]; then
- echo "SCALA_HOME is not set" >&2
- exit 1
- fi
- SCALA_LIBRARY_PATH="$SCALA_HOME/lib"
+fi
+if [[ ! -f "$FWDIR/RELEASE" && -z "$SCALA_LIBRARY_PATH" ]]; then
+ if [ -z "$SCALA_HOME" ]; then
+ echo "SCALA_HOME is not set" >&2
+ exit 1
fi
+ SCALA_LIBRARY_PATH="$SCALA_HOME/lib"
fi
-# Figure out how much memory to use per executor and set it as an environment
-# variable so that our process sees it and can report it to Mesos
-if [ -z "$SPARK_MEM" ] ; then
- SPARK_MEM="512m"
-fi
+# Set SPARK_MEM if it isn't already set since we also use it for this process
+SPARK_MEM=${SPARK_MEM:-512m}
export SPARK_MEM
# Set JAVA_OPTS to be able to load native libraries and to set heap size
@@ -125,51 +101,24 @@ export JAVA_OPTS
# Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in ExecutorRunner.scala!
if [ ! -f "$FWDIR/RELEASE" ]; then
- CORE_DIR="$FWDIR/core"
- EXAMPLES_DIR="$FWDIR/examples"
- REPL_DIR="$FWDIR/repl"
-
# Exit if the user hasn't compiled Spark
- if [ ! -e "$CORE_DIR/target" ]; then
- echo "Failed to find Spark classes in $CORE_DIR/target" >&2
+ ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/spark-assembly*.jar >& /dev/null
+ if [[ $? != 0 ]]; then
+ echo "Failed to find Spark assembly in $FWDIR/assembly/target" >&2
echo "You need to compile Spark before running this program" >&2
exit 1
fi
-
- if [[ "$@" = *repl* && ! -e "$REPL_DIR/target" ]]; then
- echo "Failed to find Spark classes in $REPL_DIR/target" >&2
- echo "You need to compile Spark repl module before running this program" >&2
- exit 1
- fi
-
- # Figure out the JAR file that our examples were packaged into. This includes a bit of a hack
- # to avoid the -sources and -doc packages that are built by publish-local.
- if [ -e "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*[0-9T].jar ]; then
- # Use the JAR from the SBT build
- export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*[0-9T].jar`
- fi
- if [ -e "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar ]; then
- # Use the JAR from the Maven build
- export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar`
- fi
fi
# Compute classpath using external script
CLASSPATH=`$FWDIR/bin/compute-classpath.sh`
export CLASSPATH
-if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then
- EXTRA_ARGS="" # Java options will be passed to scala as JAVA_OPTS
-else
- # The JVM doesn't read JAVA_OPTS by default so we need to pass it in
- EXTRA_ARGS="$JAVA_OPTS"
-fi
-
-command="$RUNNER -cp \"$CLASSPATH\" $EXTRA_ARGS $@"
if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then
- echo "Spark Command: $command"
+ echo -n "Spark Command: "
+ echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
echo "========================================"
echo
fi
-exec $command
+exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
diff --git a/spark-executor b/spark-executor
index feccbf5cc2..63692bd46c 100755
--- a/spark-executor
+++ b/spark-executor
@@ -19,4 +19,4 @@
FWDIR="`dirname $0`"
echo "Running spark-executor with framework dir = $FWDIR"
-exec $FWDIR/run spark.executor.MesosExecutorBackend
+exec $FWDIR/spark-class spark.executor.MesosExecutorBackend
diff --git a/spark-shell b/spark-shell
index 62fc18550d..4d379c5cfb 100755
--- a/spark-shell
+++ b/spark-shell
@@ -79,8 +79,7 @@ if [[ ! $? ]]; then
saved_stty=""
fi
-export SPARK_LAUNCH_WITH_SCALA=${SPARK_LAUNCH_WITH_SCALA:-1}
-$FWDIR/run $OPTIONS spark.repl.Main "$@"
+$FWDIR/spark-class $OPTIONS spark.repl.Main "$@"
# record the exit status lest it be overwritten:
# then reenable echo and propagate the code.
diff --git a/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala
index 15dbd1c0fb..0f3b6bc1a6 100644
--- a/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala
@@ -47,6 +47,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
private var isFinished:Boolean = false
def run() {
+ // setup the directories so things go to yarn approved directories rather
+ // then user specified and /tmp
+ System.setProperty("spark.local.dir", getLocalDirs())
appAttemptId = getApplicationAttemptId()
resourceManager = registerWithResourceManager()
@@ -89,6 +92,21 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
System.exit(0)
}
+
+ /** Get the Yarn approved local directories. */
+ private def getLocalDirs(): String = {
+ // Hadoop 0.23 and 2.x have different Environment variable names for the
+ // local dirs, so lets check both. We assume one of the 2 is set.
+ // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
+ val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
+ .getOrElse(Option(System.getenv("LOCAL_DIRS"))
+ .getOrElse(""))
+
+ if (localDirs.isEmpty()) {
+ throw new Exception("Yarn Local dirs can't be empty")
+ }
+ return localDirs
+ }
private def getApplicationAttemptId(): ApplicationAttemptId = {
val envs = System.getenv()
diff --git a/yarn/src/main/scala/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/spark/deploy/yarn/Client.scala
index 9d3860b863..eb2a8cc642 100644
--- a/yarn/src/main/scala/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/spark/deploy/yarn/Client.scala
@@ -185,6 +185,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
env("SPARK_YARN_LOG4J_SIZE") = log4jConfLocalRes.getSize().toString()
}
+ // allow users to specify some environment variables
+ Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
// Add each SPARK-* key to the environment
System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
@@ -221,6 +223,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
// Add Xmx for am memory
JAVA_OPTS += "-Xmx" + amMemory + "m "
+ JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(),
+ YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
+
+
// Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out.
// The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same
// node, spark gc effects all other containers performance (which can also be other spark containers)
@@ -241,7 +247,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
// Command for the ApplicationMaster
var javaCommand = "java";
val javaHome = System.getenv("JAVA_HOME")
- if (javaHome != null && !javaHome.isEmpty()) {
+ if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
}
diff --git a/yarn/src/main/scala/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/spark/deploy/yarn/WorkerRunnable.scala
index f458f2f6a1..0e1fd9b680 100644
--- a/yarn/src/main/scala/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/src/main/scala/spark/deploy/yarn/WorkerRunnable.scala
@@ -75,6 +75,10 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
}
+
+ JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(),
+ YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
+
// Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out.
// The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same
// node, spark gc effects all other containers performance (which can also be other spark containers)
@@ -104,7 +108,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
var javaCommand = "java";
val javaHome = System.getenv("JAVA_HOME")
- if (javaHome != null && !javaHome.isEmpty()) {
+ if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
}
@@ -187,6 +191,9 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH")
Client.populateHadoopClasspath(yarnConf, env)
+ // allow users to specify some environment variables
+ Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
+
System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
return env
}