aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDmitriy Lyubimov <dlyubimov@apache.org>2013-07-31 22:09:22 -0700
committerDmitriy Lyubimov <dlyubimov@apache.org>2013-07-31 22:09:22 -0700
commitcb6be5bd7eab8b3cf682a6d0347b87d216d43939 (patch)
tree5071c649b24b37e5669571981da8100aaec3aaf8
parent28f1550f0134bad1391682135b9bfc43cb19fc01 (diff)
parent3097d75d6f5b93cac851dd6f43faed5a492b2676 (diff)
downloadspark-cb6be5bd7eab8b3cf682a6d0347b87d216d43939.tar.gz
spark-cb6be5bd7eab8b3cf682a6d0347b87d216d43939.tar.bz2
spark-cb6be5bd7eab8b3cf682a6d0347b87d216d43939.zip
Merge remote-tracking branch 'mesos/master' into SPARK-826
Conflicts: core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala core/src/test/scala/spark/KryoSerializerSuite.scala
-rw-r--r--assembly/README13
-rw-r--r--assembly/pom.xml92
-rw-r--r--assembly/src/main/assembly/assembly.xml68
-rwxr-xr-xbin/compute-classpath.sh11
-rw-r--r--conf/metrics.properties.template87
-rw-r--r--core/pom.xml22
-rw-r--r--core/src/main/scala/spark/Cache.scala80
-rw-r--r--core/src/main/scala/spark/KryoSerializer.scala137
-rw-r--r--core/src/main/scala/spark/Partitioner.scala14
-rw-r--r--core/src/main/scala/spark/SparkContext.scala15
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala11
-rw-r--r--core/src/main/scala/spark/Utils.scala11
-rw-r--r--core/src/main/scala/spark/api/python/PythonPartitioner.scala25
-rw-r--r--core/src/main/scala/spark/api/python/PythonRDD.scala64
-rw-r--r--core/src/main/scala/spark/api/python/PythonWorkerFactory.scala4
-rw-r--r--core/src/main/scala/spark/broadcast/HttpBroadcast.scala36
-rw-r--r--core/src/main/scala/spark/deploy/DeployMessage.scala146
-rw-r--r--core/src/main/scala/spark/deploy/JsonProtocol.scala11
-rw-r--r--core/src/main/scala/spark/deploy/client/Client.scala21
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala47
-rw-r--r--core/src/main/scala/spark/deploy/master/MasterSource.scala25
-rw-r--r--core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala11
-rw-r--r--core/src/main/scala/spark/deploy/master/ui/IndexPage.scala16
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala19
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala38
-rw-r--r--core/src/main/scala/spark/deploy/worker/WorkerSource.scala34
-rw-r--r--core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala23
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala6
-rw-r--r--core/src/main/scala/spark/executor/ExecutorSource.scala30
-rw-r--r--core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala17
-rw-r--r--core/src/main/scala/spark/io/CompressionCodec.scala82
-rw-r--r--core/src/main/scala/spark/metrics/MetricsConfig.scala96
-rw-r--r--core/src/main/scala/spark/metrics/MetricsSystem.scala146
-rw-r--r--core/src/main/scala/spark/metrics/sink/ConsoleSink.scala59
-rw-r--r--core/src/main/scala/spark/metrics/sink/CsvSink.scala68
-rw-r--r--core/src/main/scala/spark/metrics/sink/JmxSink.scala (renamed from core/src/main/scala/spark/SoftReferenceCache.scala)24
-rw-r--r--core/src/main/scala/spark/metrics/sink/Sink.scala23
-rw-r--r--core/src/main/scala/spark/metrics/source/JvmSource.scala32
-rw-r--r--core/src/main/scala/spark/metrics/source/Source.scala25
-rw-r--r--core/src/main/scala/spark/rdd/PartitionPruningRDD.scala5
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala38
-rw-r--r--core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala30
-rw-r--r--core/src/main/scala/spark/scheduler/JobLogger.scala16
-rw-r--r--core/src/main/scala/spark/scheduler/ResultTask.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala15
-rw-r--r--core/src/main/scala/spark/scheduler/SparkListener.scala9
-rw-r--r--core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala168
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala61
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala9
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala11
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala22
-rw-r--r--core/src/main/scala/spark/scheduler/local/LocalScheduler.scala22
-rw-r--r--core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala89
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala27
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala17
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala18
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala1
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMasterActor.scala2
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMessages.scala163
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala2
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerSource.scala48
-rw-r--r--core/src/main/scala/spark/storage/BlockMessage.scala1
-rw-r--r--core/src/main/scala/spark/storage/BlockMessageArray.scala5
-rw-r--r--core/src/main/scala/spark/storage/BlockObjectWriter.scala2
-rw-r--r--core/src/main/scala/spark/storage/DiskStore.scala1
-rw-r--r--core/src/main/scala/spark/ui/exec/ExecutorsUI.scala27
-rw-r--r--core/src/main/scala/spark/ui/jobs/IndexPage.scala80
-rw-r--r--core/src/main/scala/spark/ui/jobs/JobProgressUI.scala66
-rw-r--r--core/src/main/scala/spark/ui/jobs/StagePage.scala54
-rw-r--r--core/src/main/scala/spark/util/Vector.scala5
-rw-r--r--core/src/test/resources/test_metrics_config.properties6
-rw-r--r--core/src/test/resources/test_metrics_system.properties7
-rw-r--r--core/src/test/scala/spark/FailureSuite.scala39
-rw-r--r--core/src/test/scala/spark/KryoSerializerSuite.scala14
-rw-r--r--core/src/test/scala/spark/PartitionPruningRDDSuite.scala28
-rw-r--r--core/src/test/scala/spark/io/CompressionCodecSuite.scala62
-rw-r--r--core/src/test/scala/spark/metrics/MetricsConfigSuite.scala64
-rw-r--r--core/src/test/scala/spark/metrics/MetricsSystemSuite.scala39
-rw-r--r--core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala2
-rw-r--r--core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala13
-rw-r--r--docs/configuration.md29
-rw-r--r--docs/python-programming-guide.md34
-rwxr-xr-xec2/spark_ec2.py39
-rwxr-xr-xmake-distribution.sh1
-rw-r--r--mllib/pom.xml165
-rw-r--r--mllib/src/main/scala/spark/mllib/classification/ClassificationModel.scala21
-rw-r--r--mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala (renamed from mllib/src/main/scala/spark/mllib/regression/LogisticRegression.scala)94
-rw-r--r--mllib/src/main/scala/spark/mllib/classification/SVM.scala241
-rw-r--r--mllib/src/main/scala/spark/mllib/optimization/Gradient.scala45
-rw-r--r--mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala20
-rw-r--r--mllib/src/main/scala/spark/mllib/optimization/Updater.scala52
-rw-r--r--mllib/src/main/scala/spark/mllib/recommendation/ALS.scala5
-rw-r--r--mllib/src/main/scala/spark/mllib/regression/Lasso.scala241
-rw-r--r--mllib/src/main/scala/spark/mllib/regression/RegressionModel.scala (renamed from mllib/src/main/scala/spark/mllib/regression/Regression.scala)2
-rw-r--r--mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala5
-rw-r--r--mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala45
-rw-r--r--mllib/src/main/scala/spark/mllib/util/MLUtils.scala4
-rw-r--r--mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala48
-rw-r--r--mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala (renamed from mllib/src/test/scala/spark/mllib/regression/LogisticRegressionSuite.scala)56
-rw-r--r--mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala116
-rw-r--r--mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala127
-rw-r--r--pom.xml48
-rw-r--r--project/SparkBuild.scala20
-rwxr-xr-xpyspark10
-rwxr-xr-xpython/examples/als.py5
-rwxr-xr-x[-rw-r--r--]python/examples/kmeans.py3
-rwxr-xr-xpython/examples/logistic_regression.py5
-rwxr-xr-x[-rw-r--r--]python/examples/pi.py3
-rwxr-xr-x[-rw-r--r--]python/examples/transitive_closure.py5
-rwxr-xr-x[-rw-r--r--]python/examples/wordcount.py3
-rw-r--r--python/pyspark/context.py11
-rw-r--r--python/pyspark/rdd.py28
-rwxr-xr-xpython/run-tests3
-rw-r--r--repl/src/main/scala/spark/repl/SparkILoop.scala4
-rwxr-xr-xrun18
-rwxr-xr-xspark-shell1
-rw-r--r--streaming/src/main/scala/spark/streaming/Checkpoint.scala26
-rw-r--r--tools/pom.xml133
120 files changed, 3696 insertions, 1004 deletions
diff --git a/assembly/README b/assembly/README
new file mode 100644
index 0000000000..6ee2a536d7
--- /dev/null
+++ b/assembly/README
@@ -0,0 +1,13 @@
+This is an assembly module for Spark project.
+
+It creates a single tar.gz file that includes all needed dependency of the project
+except for org.apache.hadoop.* jars that are supposed to be available from the
+deployed Hadoop cluster.
+
+This module is off by default to avoid spending extra time on top of repl-bin
+module. To activate it specify the profile in the command line
+ -Passembly
+
+In case you want to avoid building time-expensive repl-bin module, that shaders
+all the dependency into a big flat jar supplement maven command with
+ -DnoExpensive
diff --git a/assembly/pom.xml b/assembly/pom.xml
new file mode 100644
index 0000000000..cc5a4875af
--- /dev/null
+++ b/assembly/pom.xml
@@ -0,0 +1,92 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-parent</artifactId>
+ <version>0.8.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-assembly</artifactId>
+ <name>Spark Project Assembly</name>
+ <url>http://spark-project.org/</url>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.4</version>
+ <executions>
+ <execution>
+ <id>dist</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assembly/assembly.xml</descriptor>
+ </descriptors>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>hadoop1</id>
+ <properties>
+ <classifier.name>hadoop1</classifier.name>
+ </properties>
+ </profile>
+ <profile>
+ <id>hadoop2</id>
+ <properties>
+ <classifier.name>hadoop2</classifier.name>
+ </properties>
+ </profile>
+ <profile>
+ <id>hadoop2-yarn</id>
+ <properties>
+ <classifier.name>hadoop2-yarn</classifier.name>
+ </properties>
+ </profile>
+ </profiles>
+ <dependencies>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-core</artifactId>
+ <classifier>${classifier.name}</classifier>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-bagel</artifactId>
+ <classifier>${classifier.name}</classifier>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-mllib</artifactId>
+ <classifier>${classifier.name}</classifier>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-repl</artifactId>
+ <classifier>${classifier.name}</classifier>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-streaming</artifactId>
+ <classifier>${classifier.name}</classifier>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+</project> \ No newline at end of file
diff --git a/assembly/src/main/assembly/assembly.xml b/assembly/src/main/assembly/assembly.xml
new file mode 100644
index 0000000000..14485b7181
--- /dev/null
+++ b/assembly/src/main/assembly/assembly.xml
@@ -0,0 +1,68 @@
+<assembly>
+ <id>dist</id>
+ <formats>
+ <format>tar.gz</format>
+ <format>dir</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+
+ <fileSets>
+ <fileSet>
+ <includes>
+ <include>README</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>
+ ${project.parent.basedir}/core/src/main/resources/spark/ui/static/
+ </directory>
+ <outputDirectory>/ui-resources/spark/ui/static</outputDirectory>
+ <includes>
+ <include>**/*</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>
+ ${project.parent.basedir}/bin/
+ </directory>
+ <outputDirectory>/bin</outputDirectory>
+ <includes>
+ <include>**/*</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>
+ ${project.parent.basedir}
+ </directory>
+ <outputDirectory>/bin</outputDirectory>
+ <includes>
+ <include>run*</include>
+ <include>spark-shell*</include>
+ <include>spark-executor*</include>
+ </includes>
+ </fileSet>
+ </fileSets>
+
+ <dependencySets>
+ <dependencySet>
+ <includes>
+ <include>org.spark-project:*:jar</include>
+ </includes>
+ <excludes>
+ <exclude>org.spark-project:spark-assembly:jar</exclude>
+ </excludes>
+ </dependencySet>
+ <dependencySet>
+ <outputDirectory>lib</outputDirectory>
+ <useTransitiveDependencies>true</useTransitiveDependencies>
+ <unpack>false</unpack>
+ <scope>runtime</scope>
+ <useProjectArtifact>false</useProjectArtifact>
+ <excludes>
+ <exclude>org.apache.hadoop:*:jar</exclude>
+ <exclude>org.spark-project:*:jar</exclude>
+ </excludes>
+ </dependencySet>
+ </dependencySets>
+
+</assembly>
diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh
index eb270a5428..e4ce1ca848 100755
--- a/bin/compute-classpath.sh
+++ b/bin/compute-classpath.sh
@@ -76,17 +76,6 @@ function dev_classpath {
CLASSPATH="$CLASSPATH:$jar"
done
- # Figure out the JAR file that our examples were packaged into. This includes a bit of a hack
- # to avoid the -sources and -doc packages that are built by publish-local.
- if [ -e "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*[0-9T].jar ]; then
- # Use the JAR from the SBT build
- export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*[0-9T].jar`
- fi
- if [ -e "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar ]; then
- # Use the JAR from the Maven build
- export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar`
- fi
-
# Add Scala standard library
if [ -z "$SCALA_LIBRARY_PATH" ]; then
if [ -z "$SCALA_HOME" ]; then
diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template
new file mode 100644
index 0000000000..0486ca4c79
--- /dev/null
+++ b/conf/metrics.properties.template
@@ -0,0 +1,87 @@
+# syntax: [instance].[sink|source].[name].[options]
+
+# "instance" specify "who" (the role) use metrics system. In spark there are
+# several roles like master, worker, executor, driver, these roles will
+# create metrics system for monitoring. So instance represents these roles.
+# Currently in Spark, several instances have already implemented: master,
+# worker, executor, driver.
+#
+# [instance] field can be "master", "worker", "executor", "driver", which means
+# only the specified instance has this property.
+# a wild card "*" can be used to represent instance name, which means all the
+# instances will have this property.
+#
+# "source" specify "where" (source) to collect metrics data. In metrics system,
+# there exists two kinds of source:
+# 1. Spark internal source, like MasterSource, WorkerSource, etc, which will
+# collect Spark component's internal state, these sources are related to
+# instance and will be added after specific metrics system is created.
+# 2. Common source, like JvmSource, which will collect low level state, is
+# configured by configuration and loaded through reflection.
+#
+# "sink" specify "where" (destination) to output metrics data to. Several sinks
+# can be coexisted and flush metrics to all these sinks.
+#
+# [sink|source] field specify this property is source related or sink, this
+# field can only be source or sink.
+#
+# [name] field specify the name of source or sink, this is custom defined.
+#
+# [options] field is the specific property of this source or sink, this source
+# or sink is responsible for parsing this property.
+#
+# Notes:
+# 1. Sinks should be added through configuration, like console sink, class
+# full name should be specified by class property.
+# 2. Some sinks can specify polling period, like console sink, which is 10 seconds,
+# it should be attention minimal polling period is 1 seconds, any period
+# below than 1s is illegal.
+# 3. Wild card property can be overlapped by specific instance property, for
+# example, *.sink.console.period can be overlapped by master.sink.console.period.
+# 4. A metrics specific configuration
+# "spark.metrics.conf=${SPARK_HOME}/conf/metrics.properties" should be
+# added to Java property using -Dspark.metrics.conf=xxx if you want to
+# customize metrics system, or you can put it in ${SPARK_HOME}/conf,
+# metrics system will search and load it automatically.
+
+# Enable JmxSink for all instances by class name
+#*.sink.jmx.class=spark.metrics.sink.JmxSink
+
+# Enable ConsoleSink for all instances by class name
+#*.sink.console.class=spark.metrics.sink.ConsoleSink
+
+# Polling period for ConsoleSink
+#*.sink.console.period=10
+
+#*.sink.console.unit=seconds
+
+# Master instance overlap polling period
+#master.sink.console.period=15
+
+#master.sink.console.unit=seconds
+
+# Enable CsvSink for all instances
+#*.sink.csv.class=spark.metrics.sink.CsvSink
+
+# Polling period for CsvSink
+#*.sink.csv.period=1
+
+#*.sink.csv.unit=minutes
+
+# Polling directory for CsvSink
+#*.sink.csv.directory=/tmp/
+
+# Worker instance overlap polling period
+#worker.sink.csv.period=10
+
+#worker.sink.csv.unit=minutes
+
+# Enable jvm source for instance master, worker, driver and executor
+#master.source.jvm.class=spark.metrics.source.JvmSource
+
+#worker.source.jvm.class=spark.metrics.source.JvmSource
+
+#driver.source.jvm.class=spark.metrics.source.JvmSource
+
+#executor.source.jvm.class=spark.metrics.source.JvmSource
+
diff --git a/core/pom.xml b/core/pom.xml
index 6329b2fbd8..ba0071f582 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -49,6 +49,10 @@
<artifactId>compress-lzf</artifactId>
</dependency>
<dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
</dependency>
@@ -57,8 +61,14 @@
<artifactId>protobuf-java</artifactId>
</dependency>
<dependency>
- <groupId>de.javakaffee</groupId>
- <artifactId>kryo-serializers</artifactId>
+ <groupId>com.twitter</groupId>
+ <artifactId>chill_2.9.3</artifactId>
+ <version>0.3.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>chill-java</artifactId>
+ <version>0.3.0</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
@@ -108,6 +118,14 @@
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-jvm</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.derby</groupId>
diff --git a/core/src/main/scala/spark/Cache.scala b/core/src/main/scala/spark/Cache.scala
deleted file mode 100644
index b0c83ce59d..0000000000
--- a/core/src/main/scala/spark/Cache.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.util.concurrent.atomic.AtomicInteger
-
-private[spark] sealed trait CachePutResponse
-private[spark] case class CachePutSuccess(size: Long) extends CachePutResponse
-private[spark] case class CachePutFailure() extends CachePutResponse
-
-/**
- * An interface for caches in Spark, to allow for multiple implementations. Caches are used to store
- * both partitions of cached RDDs and broadcast variables on Spark executors. Caches are also aware
- * of which entries are part of the same dataset (for example, partitions in the same RDD). The key
- * for each value in a cache is a (datasetID, partition) pair.
- *
- * A single Cache instance gets created on each machine and is shared by all caches (i.e. both the
- * RDD split cache and the broadcast variable cache), to enable global replacement policies.
- * However, because these several independent modules all perform caching, it is important to give
- * them separate key namespaces, so that an RDD and a broadcast variable (for example) do not use
- * the same key. For this purpose, Cache has the notion of KeySpaces. Each client module must first
- * ask for a KeySpace, and then call get() and put() on that space using its own keys.
- *
- * This abstract class handles the creation of key spaces, so that subclasses need only deal with
- * keys that are unique across modules.
- */
-private[spark] abstract class Cache {
- private val nextKeySpaceId = new AtomicInteger(0)
- private def newKeySpaceId() = nextKeySpaceId.getAndIncrement()
-
- def newKeySpace() = new KeySpace(this, newKeySpaceId())
-
- /**
- * Get the value for a given (datasetId, partition), or null if it is not
- * found.
- */
- def get(datasetId: Any, partition: Int): Any
-
- /**
- * Attempt to put a value in the cache; returns CachePutFailure if this was
- * not successful (e.g. because the cache replacement policy forbids it), and
- * CachePutSuccess if successful. If size estimation is available, the cache
- * implementation should set the size field in CachePutSuccess.
- */
- def put(datasetId: Any, partition: Int, value: Any): CachePutResponse
-
- /**
- * Report the capacity of the cache partition. By default this just reports
- * zero. Specific implementations can choose to provide the capacity number.
- */
- def getCapacity: Long = 0L
-}
-
-/**
- * A key namespace in a Cache.
- */
-private[spark] class KeySpace(cache: Cache, val keySpaceId: Int) {
- def get(datasetId: Any, partition: Int): Any =
- cache.get((keySpaceId, datasetId), partition)
-
- def put(datasetId: Any, partition: Int, value: Any): CachePutResponse =
- cache.put((keySpaceId, datasetId), partition, value)
-
- def getCapacity: Long = cache.getCapacity
-}
diff --git a/core/src/main/scala/spark/KryoSerializer.scala b/core/src/main/scala/spark/KryoSerializer.scala
index ee37da7948..eeb2993d8a 100644
--- a/core/src/main/scala/spark/KryoSerializer.scala
+++ b/core/src/main/scala/spark/KryoSerializer.scala
@@ -19,24 +19,16 @@ package spark
import java.io._
import java.nio.ByteBuffer
-import java.nio.channels.Channels
-
-import scala.collection.immutable
-import scala.collection.mutable
-
-import com.esotericsoftware.kryo._
-import com.esotericsoftware.kryo.{Serializer => KSerializer}
+import com.esotericsoftware.kryo.{Kryo, KryoException}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
-import de.javakaffee.kryoserializers.KryoReflectionFactorySupport
-
+import com.twitter.chill.ScalaKryoInstantiator
import serializer.{SerializerInstance, DeserializationStream, SerializationStream}
import spark.broadcast._
import spark.storage._
private[spark]
class KryoSerializationStream(kryo: Kryo, outStream: OutputStream) extends SerializationStream {
-
val output = new KryoOutput(outStream)
def writeObject[T](t: T): SerializationStream = {
@@ -50,7 +42,6 @@ class KryoSerializationStream(kryo: Kryo, outStream: OutputStream) extends Seria
private[spark]
class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends DeserializationStream {
-
val input = new KryoInput(inStream)
def readObject[T](): T = {
@@ -58,7 +49,7 @@ class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends Deser
kryo.readClassAndObject(input).asInstanceOf[T]
} catch {
// DeserializationStream uses the EOF exception to indicate stopping condition.
- case e: com.esotericsoftware.kryo.KryoException => throw new java.io.EOFException
+ case _: KryoException => throw new EOFException
}
}
@@ -69,10 +60,9 @@ class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends Deser
}
private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance {
-
- val kryo = ks.kryo.get()
- val output = ks.output.get()
- val input = ks.input.get()
+ val kryo = ks.newKryo()
+ val output = ks.newKryoOutput()
+ val input = ks.newKryoInput()
def serialize[T](t: T): ByteBuffer = {
output.clear()
@@ -108,125 +98,51 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ
* serialization.
*/
trait KryoRegistrator {
- def registerClasses(kryo: Kryo): Unit
+ def registerClasses(kryo: Kryo)
}
/**
* A Spark serializer that uses the [[http://code.google.com/p/kryo/wiki/V1Documentation Kryo 1.x library]].
*/
class KryoSerializer extends spark.serializer.Serializer with Logging {
+ private val bufferSize = System.getProperty("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024
- val bufferSize = System.getProperty("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024
+ def newKryoOutput() = new KryoOutput(bufferSize)
- val kryo = new ThreadLocal[Kryo] {
- override def initialValue = createKryo()
- }
-
- val output = new ThreadLocal[KryoOutput] {
- override def initialValue = new KryoOutput(bufferSize)
- }
-
- val input = new ThreadLocal[KryoInput] {
- override def initialValue = new KryoInput(bufferSize)
- }
+ def newKryoInput() = new KryoInput(bufferSize)
- def createKryo(): Kryo = {
- val kryo = new KryoReflectionFactorySupport()
+ def newKryo(): Kryo = {
+ val instantiator = new ScalaKryoInstantiator
+ val kryo = instantiator.newKryo()
+ val classLoader = Thread.currentThread.getContextClassLoader
// Register some commonly used classes
val toRegister: Seq[AnyRef] = Seq(
- // Arrays
- Array(1), Array(1.0), Array(1.0f), Array(1L), Array(""), Array(("", "")),
- Array(new java.lang.Object), Array(1.toByte), Array(true), Array('c'),
- // Specialized Tuple2s
- ("", ""), ("", 1), (1, 1), (1.0, 1.0), (1L, 1L),
- (1, 1.0), (1.0, 1), (1L, 1.0), (1.0, 1L), (1, 1L), (1L, 1),
- // Scala collections
- List(1), mutable.ArrayBuffer(1),
- // Options and Either
- Some(1), Left(1), Right(1),
- // Higher-dimensional tuples
- (1, 1, 1), (1, 1, 1, 1), (1, 1, 1, 1, 1),
- None,
ByteBuffer.allocate(1),
StorageLevel.MEMORY_ONLY,
PutBlock("1", ByteBuffer.allocate(1), StorageLevel.MEMORY_ONLY),
GotBlock("1", ByteBuffer.allocate(1)),
GetBlock("1")
)
- for (obj <- toRegister) {
- kryo.register(obj.getClass)
- }
+
+ for (obj <- toRegister) kryo.register(obj.getClass)
// Allow sending SerializableWritable
kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer())
kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
- // Register some commonly used Scala singleton objects. Because these
- // are singletons, we must return the exact same local object when we
- // deserialize rather than returning a clone as FieldSerializer would.
- class SingletonSerializer[T](obj: T) extends KSerializer[T] {
- override def write(kryo: Kryo, output: KryoOutput, obj: T) {}
- override def read(kryo: Kryo, input: KryoInput, cls: java.lang.Class[T]): T = obj
- }
- kryo.register(None.getClass, new SingletonSerializer[AnyRef](None))
- kryo.register(Nil.getClass, new SingletonSerializer[AnyRef](Nil))
-
- // Register maps with a special serializer since they have complex internal structure
- class ScalaMapSerializer(buildMap: Array[(Any, Any)] => scala.collection.Map[Any, Any])
- extends KSerializer[Array[(Any, Any)] => scala.collection.Map[Any, Any]] {
-
- //hack, look at https://groups.google.com/forum/#!msg/kryo-users/Eu5V4bxCfws/k-8UQ22y59AJ
- private final val FAKE_REFERENCE = new Object()
- override def write(
- kryo: Kryo,
- output: KryoOutput,
- obj: Array[(Any, Any)] => scala.collection.Map[Any, Any]) {
- val map = obj.asInstanceOf[scala.collection.Map[Any, Any]]
- output.writeInt(map.size)
- for ((k, v) <- map) {
- kryo.writeClassAndObject(output, k)
- kryo.writeClassAndObject(output, v)
- }
- }
- override def read (
- kryo: Kryo,
- input: KryoInput,
- cls: Class[Array[(Any, Any)] => scala.collection.Map[Any, Any]])
- : Array[(Any, Any)] => scala.collection.Map[Any, Any] = {
- kryo.reference(FAKE_REFERENCE)
- val size = input.readInt()
- val elems = new Array[(Any, Any)](size)
- for (i <- 0 until size) {
- val k = kryo.readClassAndObject(input)
- val v = kryo.readClassAndObject(input)
- elems(i)=(k,v)
- }
- buildMap(elems).asInstanceOf[Array[(Any, Any)] => scala.collection.Map[Any, Any]]
+ // Allow the user to register their own classes by setting spark.kryo.registrator
+ try {
+ Option(System.getProperty("spark.kryo.registrator")).foreach { regCls =>
+ logDebug("Running user registrator: " + regCls)
+ val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]
+ reg.registerClasses(kryo)
}
+ } catch {
+ case _: Exception => println("Failed to register spark.kryo.registrator")
}
- kryo.register(mutable.HashMap().getClass, new ScalaMapSerializer(mutable.HashMap() ++ _))
- // TODO: add support for immutable maps too; this is more annoying because there are many
- // subclasses of immutable.Map for small maps (with <= 4 entries)
- val map1 = Map[Any, Any](1 -> 1)
- val map2 = Map[Any, Any](1 -> 1, 2 -> 2)
- val map3 = Map[Any, Any](1 -> 1, 2 -> 2, 3 -> 3)
- val map4 = Map[Any, Any](1 -> 1, 2 -> 2, 3 -> 3, 4 -> 4)
- val map5 = Map[Any, Any](1 -> 1, 2 -> 2, 3 -> 3, 4 -> 4, 5 -> 5)
- kryo.register(map1.getClass, new ScalaMapSerializer(mutable.HashMap() ++ _ toMap))
- kryo.register(map2.getClass, new ScalaMapSerializer(mutable.HashMap() ++ _ toMap))
- kryo.register(map3.getClass, new ScalaMapSerializer(mutable.HashMap() ++ _ toMap))
- kryo.register(map4.getClass, new ScalaMapSerializer(mutable.HashMap() ++ _ toMap))
- kryo.register(map5.getClass, new ScalaMapSerializer(mutable.HashMap() ++ _ toMap))
- // Allow the user to register their own classes by setting spark.kryo.registrator
- val regCls = System.getProperty("spark.kryo.registrator")
- if (regCls != null) {
- logInfo("Running user registrator: " + regCls)
- val classLoader = Thread.currentThread.getContextClassLoader
- val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]
- reg.registerClasses(kryo)
- }
+ kryo.setClassLoader(classLoader)
// Allow disabling Kryo reference tracking if user knows their object graphs don't have loops
kryo.setReferences(System.getProperty("spark.kryo.referenceTracking", "true").toBoolean)
@@ -235,7 +151,6 @@ class KryoSerializer extends spark.serializer.Serializer with Logging {
}
def newInstance(): SerializerInstance = {
- this.kryo.get().setClassLoader(Thread.currentThread().getContextClassLoader)
new KryoSerializerInstance(this)
}
-}
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala
index 660af70d52..6035bc075e 100644
--- a/core/src/main/scala/spark/Partitioner.scala
+++ b/core/src/main/scala/spark/Partitioner.scala
@@ -65,17 +65,9 @@ object Partitioner {
class HashPartitioner(partitions: Int) extends Partitioner {
def numPartitions = partitions
- def getPartition(key: Any): Int = {
- if (key == null) {
- return 0
- } else {
- val mod = key.hashCode % partitions
- if (mod < 0) {
- mod + partitions
- } else {
- mod // Guard against negative hash codes
- }
- }
+ def getPartition(key: Any): Int = key match {
+ case null => 0
+ case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
override def equals(other: Any): Boolean = other match {
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 24ba605646..77cb0ee0cd 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -60,13 +60,14 @@ import org.apache.mesos.MesosNativeLibrary
import spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import spark.partial.{ApproximateEvaluator, PartialResult}
import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD}
-import spark.scheduler.{DAGScheduler, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler}
+import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler}
import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler}
import spark.scheduler.local.LocalScheduler
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
-import spark.storage.{StorageStatus, StorageUtils, RDDInfo}
+import spark.storage.{StorageStatus, StorageUtils, RDDInfo, BlockManagerSource}
import spark.util.{MetadataCleaner, TimeStampedHashMap}
import ui.{SparkUI}
+import spark.metrics._
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -270,6 +271,16 @@ class SparkContext(
// Post init
taskScheduler.postStartHook()
+ val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
+ val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)
+
+ def initDriverMetrics() {
+ SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
+ SparkEnv.get.metricsSystem.registerSource(blockManagerSource)
+ }
+
+ initDriverMetrics()
+
// Methods for creating RDDs
/** Distribute a local Scala collection to form an RDD. */
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index f2bdc11bdb..4a1d341f5d 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -24,6 +24,7 @@ import akka.actor.{Actor, ActorRef, Props, ActorSystemImpl, ActorSystem}
import akka.remote.RemoteActorRefProvider
import spark.broadcast.BroadcastManager
+import spark.metrics.MetricsSystem
import spark.storage.BlockManager
import spark.storage.BlockManagerMaster
import spark.network.ConnectionManager
@@ -53,6 +54,7 @@ class SparkEnv (
val connectionManager: ConnectionManager,
val httpFileServer: HttpFileServer,
val sparkFilesDir: String,
+ val metricsSystem: MetricsSystem,
// To be set only as part of initialization of SparkContext.
// (executorId, defaultHostPort) => executorHostPort
// If executorId is NOT found, return defaultHostPort
@@ -68,6 +70,7 @@ class SparkEnv (
broadcastManager.stop()
blockManager.stop()
blockManager.master.stop()
+ metricsSystem.stop()
actorSystem.shutdown()
// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
// down, but let's call it anyway in case it gets fixed in a later release
@@ -184,6 +187,13 @@ object SparkEnv extends Logging {
httpFileServer.initialize()
System.setProperty("spark.fileserver.uri", httpFileServer.serverUri)
+ val metricsSystem = if (isDriver) {
+ MetricsSystem.createMetricsSystem("driver")
+ } else {
+ MetricsSystem.createMetricsSystem("executor")
+ }
+ metricsSystem.start()
+
// Set the sparkFiles directory, used when downloading dependencies. In local mode,
// this is a temporary directory; in distributed mode, this is the executor's current working
// directory.
@@ -213,6 +223,7 @@ object SparkEnv extends Logging {
connectionManager,
httpFileServer,
sparkFilesDir,
+ metricsSystem,
None)
}
}
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 8e6fc66955..673f9a810d 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -638,7 +638,7 @@ private object Utils extends Logging {
output.toString
}
- /**
+ /**
* A regular expression to match classes of the "core" Spark API that we want to skip when
* finding the call site of a method.
*/
@@ -798,4 +798,13 @@ private object Utils extends Logging {
}
return buf
}
+
+ /* Calculates 'x' modulo 'mod', takes to consideration sign of x,
+ * i.e. if 'x' is negative, than 'x' % 'mod' is negative too
+ * so function return (x % mod) + mod in that case.
+ */
+ def nonNegativeMod(x: Int, mod: Int): Int = {
+ val rawMod = x % mod
+ rawMod + (if (rawMod < 0) mod else 0)
+ }
}
diff --git a/core/src/main/scala/spark/api/python/PythonPartitioner.scala b/core/src/main/scala/spark/api/python/PythonPartitioner.scala
index 31a719fbff..ac112b8c2c 100644
--- a/core/src/main/scala/spark/api/python/PythonPartitioner.scala
+++ b/core/src/main/scala/spark/api/python/PythonPartitioner.scala
@@ -18,7 +18,7 @@
package spark.api.python
import spark.Partitioner
-
+import spark.Utils
import java.util.Arrays
/**
@@ -35,25 +35,10 @@ private[spark] class PythonPartitioner(
val pyPartitionFunctionId: Long)
extends Partitioner {
- override def getPartition(key: Any): Int = {
- if (key == null) {
- return 0
- }
- else {
- val hashCode = {
- if (key.isInstanceOf[Array[Byte]]) {
- Arrays.hashCode(key.asInstanceOf[Array[Byte]])
- } else {
- key.hashCode()
- }
- }
- val mod = hashCode % numPartitions
- if (mod < 0) {
- mod + numPartitions
- } else {
- mod // Guard against negative hash codes
- }
- }
+ override def getPartition(key: Any): Int = key match {
+ case null => 0
+ case key: Array[Byte] => Utils.nonNegativeMod(Arrays.hashCode(key), numPartitions)
+ case _ => Utils.nonNegativeMod(key.hashCode(), numPartitions)
}
override def equals(other: Any): Boolean = other match {
diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala
index af10822dbd..2dd79f7100 100644
--- a/core/src/main/scala/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/spark/api/python/PythonRDD.scala
@@ -63,34 +63,42 @@ private[spark] class PythonRDD[T: ClassManifest](
// Start a thread to feed the process input from our parent's iterator
new Thread("stdin writer for " + pythonExec) {
override def run() {
- SparkEnv.set(env)
- val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
- val dataOut = new DataOutputStream(stream)
- val printOut = new PrintWriter(stream)
- // Partition index
- dataOut.writeInt(split.index)
- // sparkFilesDir
- PythonRDD.writeAsPickle(SparkFiles.getRootDirectory, dataOut)
- // Broadcast variables
- dataOut.writeInt(broadcastVars.length)
- for (broadcast <- broadcastVars) {
- dataOut.writeLong(broadcast.id)
- dataOut.writeInt(broadcast.value.length)
- dataOut.write(broadcast.value)
- }
- dataOut.flush()
- // Serialized user code
- for (elem <- command) {
- printOut.println(elem)
- }
- printOut.flush()
- // Data values
- for (elem <- parent.iterator(split, context)) {
- PythonRDD.writeAsPickle(elem, dataOut)
+ try {
+ SparkEnv.set(env)
+ val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
+ val dataOut = new DataOutputStream(stream)
+ val printOut = new PrintWriter(stream)
+ // Partition index
+ dataOut.writeInt(split.index)
+ // sparkFilesDir
+ PythonRDD.writeAsPickle(SparkFiles.getRootDirectory, dataOut)
+ // Broadcast variables
+ dataOut.writeInt(broadcastVars.length)
+ for (broadcast <- broadcastVars) {
+ dataOut.writeLong(broadcast.id)
+ dataOut.writeInt(broadcast.value.length)
+ dataOut.write(broadcast.value)
+ }
+ dataOut.flush()
+ // Serialized user code
+ for (elem <- command) {
+ printOut.println(elem)
+ }
+ printOut.flush()
+ // Data values
+ for (elem <- parent.iterator(split, context)) {
+ PythonRDD.writeAsPickle(elem, dataOut)
+ }
+ dataOut.flush()
+ printOut.flush()
+ worker.shutdownOutput()
+ } catch {
+ case e: IOException =>
+ // This can happen for legitimate reasons if the Python code stops returning data before we are done
+ // passing elements through, e.g., for take(). Just log a message to say it happened.
+ logInfo("stdin writer to Python finished early")
+ logDebug("stdin writer to Python finished early", e)
}
- dataOut.flush()
- printOut.flush()
- worker.shutdownOutput()
}
}.start()
@@ -297,7 +305,7 @@ class PythonAccumulatorParam(@transient serverHost: String, serverPort: Int)
Utils.checkHost(serverHost, "Expected hostname")
val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
-
+
override def zero(value: JList[Array[Byte]]): JList[Array[Byte]] = new JArrayList
override def addInPlace(val1: JList[Array[Byte]], val2: JList[Array[Byte]])
diff --git a/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala
index 078ad45ce8..14f8320678 100644
--- a/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala
@@ -17,7 +17,7 @@
package spark.api.python
-import java.io.{DataInputStream, IOException}
+import java.io.{File, DataInputStream, IOException}
import java.net.{Socket, SocketException, InetAddress}
import scala.collection.JavaConversions._
@@ -67,6 +67,8 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/daemon.py"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars)
+ val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
+ workerEnv.put("PYTHONPATH", pythonPath)
daemon = pb.start()
// Redirect the stderr to ours
diff --git a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
index c565876950..138a8c21bc 100644
--- a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
@@ -17,21 +17,20 @@
package spark.broadcast
-import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
-
-import java.io._
-import java.net._
-import java.util.UUID
+import java.io.{File, FileOutputStream, ObjectInputStream, OutputStream}
+import java.net.URL
import it.unimi.dsi.fastutil.io.FastBufferedInputStream
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
-import spark._
+import spark.{HttpServer, Logging, SparkEnv, Utils}
+import spark.io.CompressionCodec
import spark.storage.StorageLevel
-import util.{MetadataCleaner, TimeStampedHashSet}
+import spark.util.{MetadataCleaner, TimeStampedHashSet}
+
private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
-extends Broadcast[T](id) with Logging with Serializable {
+ extends Broadcast[T](id) with Logging with Serializable {
def value = value_
@@ -85,6 +84,7 @@ private object HttpBroadcast extends Logging {
private val files = new TimeStampedHashSet[String]
private val cleaner = new MetadataCleaner("HttpBroadcast", cleanup)
+ private lazy val compressionCodec = CompressionCodec.createCodec()
def initialize(isDriver: Boolean) {
synchronized {
@@ -122,10 +122,12 @@ private object HttpBroadcast extends Logging {
def write(id: Long, value: Any) {
val file = new File(broadcastDir, "broadcast-" + id)
- val out: OutputStream = if (compress) {
- new LZFOutputStream(new FileOutputStream(file)) // Does its own buffering
- } else {
- new FastBufferedOutputStream(new FileOutputStream(file), bufferSize)
+ val out: OutputStream = {
+ if (compress) {
+ compressionCodec.compressedOutputStream(new FileOutputStream(file))
+ } else {
+ new FastBufferedOutputStream(new FileOutputStream(file), bufferSize)
+ }
}
val ser = SparkEnv.get.serializer.newInstance()
val serOut = ser.serializeStream(out)
@@ -136,10 +138,12 @@ private object HttpBroadcast extends Logging {
def read[T](id: Long): T = {
val url = serverUri + "/broadcast-" + id
- var in = if (compress) {
- new LZFInputStream(new URL(url).openStream()) // Does its own buffering
- } else {
- new FastBufferedInputStream(new URL(url).openStream(), bufferSize)
+ val in = {
+ if (compress) {
+ compressionCodec.compressedInputStream(new URL(url).openStream())
+ } else {
+ new FastBufferedInputStream(new URL(url).openStream(), bufferSize)
+ }
}
val ser = SparkEnv.get.serializer.newInstance()
val serIn = ser.deserializeStream(in)
diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala
index e1f8aff6f5..7c37a16615 100644
--- a/core/src/main/scala/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/spark/deploy/DeployMessage.scala
@@ -17,109 +17,107 @@
package spark.deploy
+import scala.collection.immutable.List
+
+import spark.Utils
import spark.deploy.ExecutorState.ExecutorState
import spark.deploy.master.{WorkerInfo, ApplicationInfo}
import spark.deploy.worker.ExecutorRunner
-import scala.collection.immutable.List
-import spark.Utils
-private[spark] sealed trait DeployMessage extends Serializable
+private[deploy] sealed trait DeployMessage extends Serializable
-// Worker to Master
+private[deploy] object DeployMessages {
-private[spark]
-case class RegisterWorker(
- id: String,
- host: String,
- port: Int,
- cores: Int,
- memory: Int,
- webUiPort: Int,
- publicAddress: String)
- extends DeployMessage {
- Utils.checkHost(host, "Required hostname")
- assert (port > 0)
-}
+ // Worker to Master
-private[spark]
-case class ExecutorStateChanged(
- appId: String,
- execId: Int,
- state: ExecutorState,
- message: Option[String],
- exitStatus: Option[Int])
- extends DeployMessage
+ case class RegisterWorker(
+ id: String,
+ host: String,
+ port: Int,
+ cores: Int,
+ memory: Int,
+ webUiPort: Int,
+ publicAddress: String)
+ extends DeployMessage {
+ Utils.checkHost(host, "Required hostname")
+ assert (port > 0)
+ }
-private[spark] case class Heartbeat(workerId: String) extends DeployMessage
+ case class ExecutorStateChanged(
+ appId: String,
+ execId: Int,
+ state: ExecutorState,
+ message: Option[String],
+ exitStatus: Option[Int])
+ extends DeployMessage
-// Master to Worker
+ case class Heartbeat(workerId: String) extends DeployMessage
-private[spark] case class RegisteredWorker(masterWebUiUrl: String) extends DeployMessage
-private[spark] case class RegisterWorkerFailed(message: String) extends DeployMessage
-private[spark] case class KillExecutor(appId: String, execId: Int) extends DeployMessage
+ // Master to Worker
-private[spark] case class LaunchExecutor(
- appId: String,
- execId: Int,
- appDesc: ApplicationDescription,
- cores: Int,
- memory: Int,
- sparkHome: String)
- extends DeployMessage
+ case class RegisteredWorker(masterWebUiUrl: String) extends DeployMessage
-// Client to Master
+ case class RegisterWorkerFailed(message: String) extends DeployMessage
-private[spark] case class RegisterApplication(appDescription: ApplicationDescription)
- extends DeployMessage
+ case class KillExecutor(appId: String, execId: Int) extends DeployMessage
-// Master to Client
+ case class LaunchExecutor(
+ appId: String,
+ execId: Int,
+ appDesc: ApplicationDescription,
+ cores: Int,
+ memory: Int,
+ sparkHome: String)
+ extends DeployMessage
-private[spark]
-case class RegisteredApplication(appId: String) extends DeployMessage
+ // Client to Master
-private[spark]
-case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) {
- Utils.checkHostPort(hostPort, "Required hostport")
-}
+ case class RegisterApplication(appDescription: ApplicationDescription)
+ extends DeployMessage
-private[spark]
-case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String],
- exitStatus: Option[Int])
+ // Master to Client
-private[spark]
-case class ApplicationRemoved(message: String)
+ case class RegisteredApplication(appId: String) extends DeployMessage
-// Internal message in Client
+ case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) {
+ Utils.checkHostPort(hostPort, "Required hostport")
+ }
-private[spark] case object StopClient
+ case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String],
+ exitStatus: Option[Int])
-// MasterWebUI To Master
+ case class ApplicationRemoved(message: String)
-private[spark] case object RequestMasterState
+ // Internal message in Client
-// Master to MasterWebUI
+ case object StopClient
-private[spark]
-case class MasterState(host: String, port: Int, workers: Array[WorkerInfo],
- activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo]) {
+ // MasterWebUI To Master
- Utils.checkHost(host, "Required hostname")
- assert (port > 0)
+ case object RequestMasterState
- def uri = "spark://" + host + ":" + port
-}
+ // Master to MasterWebUI
+
+ case class MasterStateResponse(host: String, port: Int, workers: Array[WorkerInfo],
+ activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo]) {
+
+ Utils.checkHost(host, "Required hostname")
+ assert (port > 0)
+
+ def uri = "spark://" + host + ":" + port
+ }
-// WorkerWebUI to Worker
-private[spark] case object RequestWorkerState
+ // WorkerWebUI to Worker
+ case object RequestWorkerState
-// Worker to WorkerWebUI
+ // Worker to WorkerWebUI
-private[spark]
-case class WorkerState(host: String, port: Int, workerId: String, executors: List[ExecutorRunner],
- finishedExecutors: List[ExecutorRunner], masterUrl: String, cores: Int, memory: Int,
- coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) {
+ case class WorkerStateResponse(host: String, port: Int, workerId: String,
+ executors: List[ExecutorRunner], finishedExecutors: List[ExecutorRunner], masterUrl: String,
+ cores: Int, memory: Int, coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) {
- Utils.checkHost(host, "Required hostname")
- assert (port > 0)
+ Utils.checkHost(host, "Required hostname")
+ assert (port > 0)
+ }
}
diff --git a/core/src/main/scala/spark/deploy/JsonProtocol.scala b/core/src/main/scala/spark/deploy/JsonProtocol.scala
index 64f89623e1..bd1db7c294 100644
--- a/core/src/main/scala/spark/deploy/JsonProtocol.scala
+++ b/core/src/main/scala/spark/deploy/JsonProtocol.scala
@@ -17,9 +17,12 @@
package spark.deploy
-import master.{ApplicationInfo, WorkerInfo}
import net.liftweb.json.JsonDSL._
-import worker.ExecutorRunner
+
+import spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
+import spark.deploy.master.{ApplicationInfo, WorkerInfo}
+import spark.deploy.worker.ExecutorRunner
+
private[spark] object JsonProtocol {
def writeWorkerInfo(obj: WorkerInfo) = {
@@ -57,7 +60,7 @@ private[spark] object JsonProtocol {
("appdesc" -> writeApplicationDescription(obj.appDesc))
}
- def writeMasterState(obj: MasterState) = {
+ def writeMasterState(obj: MasterStateResponse) = {
("url" -> ("spark://" + obj.uri)) ~
("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~
("cores" -> obj.workers.map(_.cores).sum) ~
@@ -68,7 +71,7 @@ private[spark] object JsonProtocol {
("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo))
}
- def writeWorkerState(obj: WorkerState) = {
+ def writeWorkerState(obj: WorkerStateResponse) = {
("id" -> obj.workerId) ~
("masterurl" -> obj.masterUrl) ~
("masterwebuiurl" -> obj.masterWebUiUrl) ~
diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala
index edefa0292d..9d5ba8a796 100644
--- a/core/src/main/scala/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/spark/deploy/client/Client.scala
@@ -17,21 +17,23 @@
package spark.deploy.client
-import spark.deploy._
+import java.util.concurrent.TimeoutException
+
import akka.actor._
+import akka.actor.Terminated
import akka.pattern.ask
import akka.util.Duration
-import akka.util.duration._
-import akka.pattern.AskTimeoutException
-import spark.{SparkException, Logging}
+import akka.remote.RemoteClientDisconnected
import akka.remote.RemoteClientLifeCycleEvent
import akka.remote.RemoteClientShutdown
-import spark.deploy.RegisterApplication
-import spark.deploy.master.Master
-import akka.remote.RemoteClientDisconnected
-import akka.actor.Terminated
import akka.dispatch.Await
+import spark.Logging
+import spark.deploy.{ApplicationDescription, ExecutorState}
+import spark.deploy.DeployMessages._
+import spark.deploy.master.Master
+
+
/**
* The main class used to talk to a Spark deploy cluster. Takes a master URL, an app description,
* and a listener for cluster events, and calls back the listener when various events occur.
@@ -134,7 +136,8 @@ private[spark] class Client(
val future = actor.ask(StopClient)(timeout)
Await.result(future, timeout)
} catch {
- case e: AskTimeoutException => // Ignore it, maybe master went away
+ case e: TimeoutException =>
+ logInfo("Stop request to Master timed out; it may already be shut down.")
}
actor = null
}
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index eddcafd84d..202d5bcdb7 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -17,20 +17,22 @@
package spark.deploy.master
-import akka.actor._
-import akka.actor.Terminated
-import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
-import akka.util.duration._
-
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import spark.deploy._
+import akka.actor._
+import akka.actor.Terminated
+import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
+import akka.util.duration._
+
import spark.{Logging, SparkException, Utils}
+import spark.deploy.{ApplicationDescription, ExecutorState}
+import spark.deploy.DeployMessages._
+import spark.deploy.master.ui.MasterWebUI
+import spark.metrics.MetricsSystem
import spark.util.AkkaUtils
-import ui.MasterWebUI
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
@@ -57,6 +59,9 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
Utils.checkHost(host, "Expected hostname")
+ val metricsSystem = MetricsSystem.createMetricsSystem("master")
+ val masterSource = new MasterSource(this)
+
val masterPublicAddress = {
val envVar = System.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else host
@@ -73,10 +78,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
webUi.start()
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())
+
+ metricsSystem.registerSource(masterSource)
+ metricsSystem.start()
}
override def postStop() {
webUi.stop()
+ metricsSystem.stop()
}
override def receive = {
@@ -160,7 +169,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
case RequestMasterState => {
- sender ! MasterState(host, port, workers.toArray, apps.toArray, completedApps.toArray)
+ sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray)
}
}
@@ -225,20 +234,27 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
- worker.actor ! LaunchExecutor(exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome)
- exec.application.driver ! ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
+ worker.actor ! LaunchExecutor(
+ exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome)
+ exec.application.driver ! ExecutorAdded(
+ exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
}
def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int,
publicAddress: String): WorkerInfo = {
- // There may be one or more refs to dead workers on this same node (w/ different ID's), remove them.
- workers.filter(w => (w.host == host && w.port == port) && (w.state == WorkerState.DEAD)).foreach(workers -= _)
+ // There may be one or more refs to dead workers on this same node (w/ different ID's),
+ // remove them.
+ workers.filter { w =>
+ (w.host == host && w.port == port) && (w.state == WorkerState.DEAD)
+ }.foreach { w =>
+ workers -= w
+ }
val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress)
workers += worker
idToWorker(worker.id) = worker
actorToWorker(sender) = worker
addressToWorker(sender.path.address) = worker
- return worker
+ worker
}
def removeWorker(worker: WorkerInfo) {
@@ -249,7 +265,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
addressToWorker -= worker.actor.path.address
for (exec <- worker.executors.values) {
logInfo("Telling app of lost executor: " + exec.id)
- exec.application.driver ! ExecutorUpdated(exec.id, ExecutorState.LOST, Some("worker lost"), None)
+ exec.application.driver ! ExecutorUpdated(
+ exec.id, ExecutorState.LOST, Some("worker lost"), None)
exec.application.removeExecutor(exec)
}
}
@@ -269,7 +286,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
if (workersAlive.size > 0 && !workersAlive.exists(_.memoryFree >= desc.memoryPerSlave)) {
logWarning("Could not find any workers with enough memory for " + firstApp.get.id)
}
- return app
+ app
}
def finishApplication(app: ApplicationInfo) {
diff --git a/core/src/main/scala/spark/deploy/master/MasterSource.scala b/core/src/main/scala/spark/deploy/master/MasterSource.scala
new file mode 100644
index 0000000000..b8cfa6a773
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/master/MasterSource.scala
@@ -0,0 +1,25 @@
+package spark.deploy.master
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import spark.metrics.source.Source
+
+private[spark] class MasterSource(val master: Master) extends Source {
+ val metricRegistry = new MetricRegistry()
+ val sourceName = "master"
+
+ // Gauge for worker numbers in cluster
+ metricRegistry.register(MetricRegistry.name("workers","number"), new Gauge[Int] {
+ override def getValue: Int = master.workers.size
+ })
+
+ // Gauge for application numbers in cluster
+ metricRegistry.register(MetricRegistry.name("apps", "number"), new Gauge[Int] {
+ override def getValue: Int = master.apps.size
+ })
+
+ // Gauge for waiting application numbers in cluster
+ metricRegistry.register(MetricRegistry.name("waitingApps", "number"), new Gauge[Int] {
+ override def getValue: Int = master.waitingApps.size
+ })
+}
diff --git a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
index 32264af393..b4c62bc224 100644
--- a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
@@ -17,6 +17,8 @@
package spark.deploy.master.ui
+import scala.xml.Node
+
import akka.dispatch.Await
import akka.pattern.ask
import akka.util.duration._
@@ -25,9 +27,8 @@ import javax.servlet.http.HttpServletRequest
import net.liftweb.json.JsonAST.JValue
-import scala.xml.Node
-
-import spark.deploy.{RequestMasterState, JsonProtocol, MasterState}
+import spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
+import spark.deploy.JsonProtocol
import spark.deploy.master.ExecutorInfo
import spark.ui.UIUtils
@@ -38,7 +39,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) {
/** Executor details for a particular application */
def renderJson(request: HttpServletRequest): JValue = {
val appId = request.getParameter("appId")
- val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterState]
+ val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, 30 seconds)
val app = state.activeApps.find(_.id == appId).getOrElse({
state.completedApps.find(_.id == appId).getOrElse(null)
@@ -49,7 +50,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) {
/** Executor details for a particular application */
def render(request: HttpServletRequest): Seq[Node] = {
val appId = request.getParameter("appId")
- val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterState]
+ val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, 30 seconds)
val app = state.activeApps.find(_.id == appId).getOrElse({
state.completedApps.find(_.id == appId).getOrElse(null)
diff --git a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
index b05197c1b9..557df89b41 100644
--- a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
@@ -17,18 +17,20 @@
package spark.deploy.master.ui
-import akka.dispatch.Await
-import akka.pattern.ask
-import akka.util.duration._
-
import javax.servlet.http.HttpServletRequest
import scala.xml.Node
-import spark.deploy.{RequestMasterState, DeployWebUI, MasterState}
+import akka.dispatch.Await
+import akka.pattern.ask
+import akka.util.duration._
+
import spark.Utils
-import spark.ui.UIUtils
+import spark.deploy.DeployWebUI
+import spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import spark.deploy.master.{ApplicationInfo, WorkerInfo}
+import spark.ui.UIUtils
+
private[spark] class IndexPage(parent: MasterWebUI) {
val master = parent.master
@@ -36,7 +38,7 @@ private[spark] class IndexPage(parent: MasterWebUI) {
/** Index view listing applications and executors */
def render(request: HttpServletRequest): Seq[Node] = {
- val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterState]
+ val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, 30 seconds)
val workerHeaders = Seq("Id", "Address", "State", "Cores", "Memory")
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index 8b51ff1c3a..345dfe879c 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -19,14 +19,12 @@ package spark.deploy.worker
import java.io._
import java.lang.System.getenv
-import spark.deploy.{ExecutorState, ExecutorStateChanged, ApplicationDescription}
+
import akka.actor.ActorRef
+
import spark.{Utils, Logging}
-import java.net.{URI, URL}
-import org.apache.hadoop.fs.{Path, FileSystem}
-import org.apache.hadoop.conf.Configuration
-import scala.Some
-import spark.deploy.ExecutorStateChanged
+import spark.deploy.{ExecutorState, ApplicationDescription}
+import spark.deploy.DeployMessages.ExecutorStateChanged
/**
* Manages the execution of one executor process.
@@ -51,6 +49,9 @@ private[spark] class ExecutorRunner(
var process: Process = null
var shutdownHook: Thread = null
+ private def getAppEnv(key: String): Option[String] =
+ appDesc.command.environment.get(key).orElse(Option(getenv(key)))
+
def start() {
workerThread = new Thread("ExecutorRunner for " + fullId) {
override def run() { fetchAndRunExecutor() }
@@ -95,7 +96,7 @@ private[spark] class ExecutorRunner(
def buildCommandSeq(): Seq[String] = {
val command = appDesc.command
- val runner = Option(getenv("JAVA_HOME")).map(_ + "/bin/java").getOrElse("java")
+ val runner = getAppEnv("JAVA_HOME").map(_ + "/bin/java").getOrElse("java")
// SPARK-698: do not call the run.cmd script, as process.destroy()
// fails to kill a process tree on Windows
Seq(runner) ++ buildJavaOpts() ++ Seq(command.mainClass) ++
@@ -107,10 +108,10 @@ private[spark] class ExecutorRunner(
* the way the JAVA_OPTS are assembled there.
*/
def buildJavaOpts(): Seq[String] = {
- val libraryOpts = Option(getenv("SPARK_LIBRARY_PATH"))
+ val libraryOpts = getAppEnv("SPARK_LIBRARY_PATH")
.map(p => List("-Djava.library.path=" + p))
.getOrElse(Nil)
- val userOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString).getOrElse(Nil)
+ val userOpts = getAppEnv("SPARK_JAVA_OPTS").map(Utils.splitCommandString).getOrElse(Nil)
val memoryOpts = Seq("-Xms" + memory + "M", "-Xmx" + memory + "M")
// Figure out our classpath with the external compute-classpath script
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 0bd88ea253..0e46fa281e 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -17,21 +17,24 @@
package spark.deploy.worker
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import java.text.SimpleDateFormat
+import java.util.Date
+import java.io.File
+
+import scala.collection.mutable.HashMap
+
import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
+import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
import akka.util.duration._
+
import spark.{Logging, Utils}
-import spark.util.AkkaUtils
-import spark.deploy._
-import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
-import java.text.SimpleDateFormat
-import java.util.Date
-import spark.deploy.RegisterWorker
-import spark.deploy.LaunchExecutor
-import spark.deploy.RegisterWorkerFailed
+import spark.deploy.ExecutorState
+import spark.deploy.DeployMessages._
import spark.deploy.master.Master
-import java.io.File
-import ui.WorkerWebUI
+import spark.deploy.worker.ui.WorkerWebUI
+import spark.metrics.MetricsSystem
+import spark.util.AkkaUtils
+
private[spark] class Worker(
host: String,
@@ -67,6 +70,9 @@ private[spark] class Worker(
var coresUsed = 0
var memoryUsed = 0
+ val metricsSystem = MetricsSystem.createMetricsSystem("worker")
+ val workerSource = new WorkerSource(this)
+
def coresFree: Int = cores - coresUsed
def memoryFree: Int = memory - memoryUsed
@@ -97,6 +103,9 @@ private[spark] class Worker(
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
webUi.start()
connectToMaster()
+
+ metricsSystem.registerSource(workerSource)
+ metricsSystem.start()
}
def connectToMaster() {
@@ -155,10 +164,10 @@ private[spark] class Worker(
case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
masterDisconnected()
-
+
case RequestWorkerState => {
- sender ! WorkerState(host, port, workerId, executors.values.toList,
- finishedExecutors.values.toList, masterUrl, cores, memory,
+ sender ! WorkerStateResponse(host, port, workerId, executors.values.toList,
+ finishedExecutors.values.toList, masterUrl, cores, memory,
coresUsed, memoryUsed, masterWebUiUrl)
}
}
@@ -178,6 +187,7 @@ private[spark] class Worker(
override def postStop() {
executors.values.foreach(_.kill())
webUi.stop()
+ metricsSystem.stop()
}
}
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerSource.scala b/core/src/main/scala/spark/deploy/worker/WorkerSource.scala
new file mode 100644
index 0000000000..39cb8e5690
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/worker/WorkerSource.scala
@@ -0,0 +1,34 @@
+package spark.deploy.worker
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import spark.metrics.source.Source
+
+private[spark] class WorkerSource(val worker: Worker) extends Source {
+ val sourceName = "worker"
+ val metricRegistry = new MetricRegistry()
+
+ metricRegistry.register(MetricRegistry.name("executors", "number"), new Gauge[Int] {
+ override def getValue: Int = worker.executors.size
+ })
+
+ // Gauge for cores used of this worker
+ metricRegistry.register(MetricRegistry.name("coresUsed", "number"), new Gauge[Int] {
+ override def getValue: Int = worker.coresUsed
+ })
+
+ // Gauge for memory used of this worker
+ metricRegistry.register(MetricRegistry.name("memUsed", "MBytes"), new Gauge[Int] {
+ override def getValue: Int = worker.memoryUsed
+ })
+
+ // Gauge for cores free of this worker
+ metricRegistry.register(MetricRegistry.name("coresFree", "number"), new Gauge[Int] {
+ override def getValue: Int = worker.coresFree
+ })
+
+ // Gauge for memory free of this worker
+ metricRegistry.register(MetricRegistry.name("memFree", "MBytes"), new Gauge[Int] {
+ override def getValue: Int = worker.memoryFree
+ })
+}
diff --git a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
index 7548a26c2e..1619c6a4c2 100644
--- a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
@@ -17,34 +17,36 @@
package spark.deploy.worker.ui
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
import akka.dispatch.Await
import akka.pattern.ask
import akka.util.duration._
-import javax.servlet.http.HttpServletRequest
-
import net.liftweb.json.JsonAST.JValue
-import scala.xml.Node
-
-import spark.deploy.{RequestWorkerState, JsonProtocol, WorkerState}
-import spark.deploy.worker.ExecutorRunner
import spark.Utils
+import spark.deploy.JsonProtocol
+import spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse}
+import spark.deploy.worker.ExecutorRunner
import spark.ui.UIUtils
+
private[spark] class IndexPage(parent: WorkerWebUI) {
val workerActor = parent.worker.self
val worker = parent.worker
val timeout = parent.timeout
def renderJson(request: HttpServletRequest): JValue = {
- val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerState]
+ val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
val workerState = Await.result(stateFuture, 30 seconds)
JsonProtocol.writeWorkerState(workerState)
}
def render(request: HttpServletRequest): Seq[Node] = {
- val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerState]
+ val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
val workerState = Await.result(stateFuture, 30 seconds)
val executorHeaders = Seq("ExecutorID", "Cores", "Memory", "Job Details", "Logs")
@@ -69,7 +71,7 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
<p><a href={workerState.masterWebUiUrl}>Back to Master</a></p>
</div>
</div>
- <hr/>
+ <hr/>
<div class="row"> <!-- Running Executors -->
<div class="span12">
@@ -88,7 +90,8 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
</div>
</div>;
- UIUtils.basicSparkPage(content, "Spark Worker on %s:%s".format(workerState.host, workerState.port))
+ UIUtils.basicSparkPage(content, "Spark Worker on %s:%s".format(
+ workerState.host, workerState.port))
}
def executorRow(executor: ExecutorRunner): Seq[Node] = {
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 2e81151882..8a74a8d853 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -69,7 +69,7 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
override def uncaughtException(thread: Thread, exception: Throwable) {
try {
logError("Uncaught exception in thread " + thread, exception)
-
+
// We may have been called from a shutdown hook. If so, we must not call System.exit().
// (If we do, we will deadlock.)
if (!Utils.inShutdown()) {
@@ -87,9 +87,13 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
}
)
+ val executorSource = new ExecutorSource(this)
+
// Initialize Spark environment (using system properties read above)
val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
SparkEnv.set(env)
+ env.metricsSystem.registerSource(executorSource)
+
private val akkaFrameSize = env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size")
// Start worker thread pool
diff --git a/core/src/main/scala/spark/executor/ExecutorSource.scala b/core/src/main/scala/spark/executor/ExecutorSource.scala
new file mode 100644
index 0000000000..94116edfcf
--- /dev/null
+++ b/core/src/main/scala/spark/executor/ExecutorSource.scala
@@ -0,0 +1,30 @@
+package spark.executor
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import spark.metrics.source.Source
+
+class ExecutorSource(val executor: Executor) extends Source {
+ val metricRegistry = new MetricRegistry()
+ val sourceName = "executor"
+
+ // Gauge for executor thread pool's actively executing task counts
+ metricRegistry.register(MetricRegistry.name("threadpool", "activeTask", "count"), new Gauge[Int] {
+ override def getValue: Int = executor.threadPool.getActiveCount()
+ })
+
+ // Gauge for executor thread pool's approximate total number of tasks that have been completed
+ metricRegistry.register(MetricRegistry.name("threadpool", "completeTask", "count"), new Gauge[Long] {
+ override def getValue: Long = executor.threadPool.getCompletedTaskCount()
+ })
+
+ // Gauge for executor thread pool's current number of threads
+ metricRegistry.register(MetricRegistry.name("threadpool", "currentPool", "size"), new Gauge[Int] {
+ override def getValue: Int = executor.threadPool.getPoolSize()
+ })
+
+ // Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool
+ metricRegistry.register(MetricRegistry.name("threadpool", "maxPool", "size"), new Gauge[Int] {
+ override def getValue: Int = executor.threadPool.getMaximumPoolSize()
+ })
+}
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
index f4003da732..e47fe50021 100644
--- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
@@ -18,19 +18,16 @@
package spark.executor
import java.nio.ByteBuffer
-import spark.Logging
-import spark.TaskState.TaskState
-import spark.util.AkkaUtils
+
import akka.actor.{ActorRef, Actor, Props, Terminated}
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
-import java.util.concurrent.{TimeUnit, ThreadPoolExecutor, SynchronousQueue}
-import spark.scheduler.cluster._
-import spark.scheduler.cluster.RegisteredExecutor
-import spark.scheduler.cluster.LaunchTask
-import spark.scheduler.cluster.RegisterExecutorFailed
-import spark.scheduler.cluster.RegisterExecutor
-import spark.Utils
+
+import spark.{Logging, Utils}
+import spark.TaskState.TaskState
import spark.deploy.SparkHadoopUtil
+import spark.scheduler.cluster.StandaloneClusterMessages._
+import spark.util.AkkaUtils
+
private[spark] class StandaloneExecutorBackend(
driverUrl: String,
diff --git a/core/src/main/scala/spark/io/CompressionCodec.scala b/core/src/main/scala/spark/io/CompressionCodec.scala
new file mode 100644
index 0000000000..0adebecadb
--- /dev/null
+++ b/core/src/main/scala/spark/io/CompressionCodec.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.io
+
+import java.io.{InputStream, OutputStream}
+
+import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
+
+import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
+
+
+/**
+ * CompressionCodec allows the customization of choosing different compression implementations
+ * to be used in block storage.
+ */
+trait CompressionCodec {
+
+ def compressedOutputStream(s: OutputStream): OutputStream
+
+ def compressedInputStream(s: InputStream): InputStream
+}
+
+
+private[spark] object CompressionCodec {
+
+ def createCodec(): CompressionCodec = {
+ // Set the default codec to Snappy since the LZF implementation initializes a pretty large
+ // buffer for every stream, which results in a lot of memory overhead when the number of
+ // shuffle reduce buckets are large.
+ createCodec(classOf[SnappyCompressionCodec].getName)
+ }
+
+ def createCodec(codecName: String): CompressionCodec = {
+ Class.forName(
+ System.getProperty("spark.io.compression.codec", codecName),
+ true,
+ Thread.currentThread.getContextClassLoader).newInstance().asInstanceOf[CompressionCodec]
+ }
+}
+
+
+/**
+ * LZF implementation of [[spark.io.CompressionCodec]].
+ */
+class LZFCompressionCodec extends CompressionCodec {
+
+ override def compressedOutputStream(s: OutputStream): OutputStream = {
+ new LZFOutputStream(s).setFinishBlockOnFlush(true)
+ }
+
+ override def compressedInputStream(s: InputStream): InputStream = new LZFInputStream(s)
+}
+
+
+/**
+ * Snappy implementation of [[spark.io.CompressionCodec]].
+ * Block size can be configured by spark.io.compression.snappy.block.size.
+ */
+class SnappyCompressionCodec extends CompressionCodec {
+
+ override def compressedOutputStream(s: OutputStream): OutputStream = {
+ val blockSize = System.getProperty("spark.io.compression.snappy.block.size", "32768").toInt
+ new SnappyOutputStream(s, blockSize)
+ }
+
+ override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s)
+}
diff --git a/core/src/main/scala/spark/metrics/MetricsConfig.scala b/core/src/main/scala/spark/metrics/MetricsConfig.scala
new file mode 100644
index 0000000000..3e32e9c82f
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/MetricsConfig.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.metrics
+
+import java.util.Properties
+import java.io.{File, FileInputStream, InputStream, IOException}
+
+import scala.collection.mutable
+import scala.util.matching.Regex
+
+import spark.Logging
+
+private[spark] class MetricsConfig(val configFile: Option[String]) extends Logging {
+ initLogging()
+
+ val DEFAULT_PREFIX = "*"
+ val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
+ val METRICS_CONF = "metrics.properties"
+
+ val properties = new Properties()
+ var propertyCategories: mutable.HashMap[String, Properties] = null
+
+ private def setDefaultProperties(prop: Properties) {
+ // empty function, any default property can be set here
+ }
+
+ def initialize() {
+ //Add default properties in case there's no properties file
+ setDefaultProperties(properties)
+
+ // If spark.metrics.conf is not set, try to get file in class path
+ var is: InputStream = null
+ try {
+ is = configFile match {
+ case Some(f) => new FileInputStream(f)
+ case None => getClass.getClassLoader.getResourceAsStream(METRICS_CONF)
+ }
+
+ if (is != null) {
+ properties.load(is)
+ }
+ } catch {
+ case e: Exception => logError("Error loading configure file", e)
+ } finally {
+ if (is != null) is.close()
+ }
+
+ propertyCategories = subProperties(properties, INSTANCE_REGEX)
+ if (propertyCategories.contains(DEFAULT_PREFIX)) {
+ import scala.collection.JavaConversions._
+
+ val defaultProperty = propertyCategories(DEFAULT_PREFIX)
+ for { (inst, prop) <- propertyCategories
+ if (inst != DEFAULT_PREFIX)
+ (k, v) <- defaultProperty
+ if (prop.getProperty(k) == null) } {
+ prop.setProperty(k, v)
+ }
+ }
+ }
+
+ def subProperties(prop: Properties, regex: Regex): mutable.HashMap[String, Properties] = {
+ val subProperties = new mutable.HashMap[String, Properties]
+ import scala.collection.JavaConversions._
+ prop.foreach { kv =>
+ if (regex.findPrefixOf(kv._1) != None) {
+ val regex(prefix, suffix) = kv._1
+ subProperties.getOrElseUpdate(prefix, new Properties).setProperty(suffix, kv._2)
+ }
+ }
+ subProperties
+ }
+
+ def getInstance(inst: String): Properties = {
+ propertyCategories.get(inst) match {
+ case Some(s) => s
+ case None => propertyCategories.getOrElse(DEFAULT_PREFIX, new Properties)
+ }
+ }
+}
+
diff --git a/core/src/main/scala/spark/metrics/MetricsSystem.scala b/core/src/main/scala/spark/metrics/MetricsSystem.scala
new file mode 100644
index 0000000000..fabddfb947
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/MetricsSystem.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.metrics
+
+import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry}
+
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+
+import spark.Logging
+import spark.metrics.sink.Sink
+import spark.metrics.source.Source
+
+/**
+ * Spark Metrics System, created by specific "instance", combined by source,
+ * sink, periodically poll source metrics data to sink destinations.
+ *
+ * "instance" specify "who" (the role) use metrics system. In spark there are several roles
+ * like master, worker, executor, client driver, these roles will create metrics system
+ * for monitoring. So instance represents these roles. Currently in Spark, several instances
+ * have already implemented: master, worker, executor, driver.
+ *
+ * "source" specify "where" (source) to collect metrics data. In metrics system, there exists
+ * two kinds of source:
+ * 1. Spark internal source, like MasterSource, WorkerSource, etc, which will collect
+ * Spark component's internal state, these sources are related to instance and will be
+ * added after specific metrics system is created.
+ * 2. Common source, like JvmSource, which will collect low level state, is configured by
+ * configuration and loaded through reflection.
+ *
+ * "sink" specify "where" (destination) to output metrics data to. Several sinks can be
+ * coexisted and flush metrics to all these sinks.
+ *
+ * Metrics configuration format is like below:
+ * [instance].[sink|source].[name].[options] = xxxx
+ *
+ * [instance] can be "master", "worker", "executor", "driver", which means only the specified
+ * instance has this property.
+ * wild card "*" can be used to replace instance name, which means all the instances will have
+ * this property.
+ *
+ * [sink|source] means this property belongs to source or sink. This field can only be source or sink.
+ *
+ * [name] specify the name of sink or source, it is custom defined.
+ *
+ * [options] is the specific property of this source or sink.
+ */
+private[spark] class MetricsSystem private (val instance: String) extends Logging {
+ initLogging()
+
+ val confFile = System.getProperty("spark.metrics.conf")
+ val metricsConfig = new MetricsConfig(Option(confFile))
+
+ val sinks = new mutable.ArrayBuffer[Sink]
+ val sources = new mutable.ArrayBuffer[Source]
+ val registry = new MetricRegistry()
+
+ metricsConfig.initialize()
+ registerSources()
+ registerSinks()
+
+ def start() {
+ sinks.foreach(_.start)
+ }
+
+ def stop() {
+ sinks.foreach(_.stop)
+ }
+
+ def registerSource(source: Source) {
+ sources += source
+ try {
+ registry.register(source.sourceName, source.metricRegistry)
+ } catch {
+ case e: IllegalArgumentException => logInfo("Metrics already registered", e)
+ }
+ }
+
+ def registerSources() {
+ val instConfig = metricsConfig.getInstance(instance)
+ val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
+
+ // Register all the sources related to instance
+ sourceConfigs.foreach { kv =>
+ val classPath = kv._2.getProperty("class")
+ try {
+ val source = Class.forName(classPath).newInstance()
+ registerSource(source.asInstanceOf[Source])
+ } catch {
+ case e: Exception => logError("Source class " + classPath + " cannot be instantialized", e)
+ }
+ }
+ }
+
+ def registerSinks() {
+ val instConfig = metricsConfig.getInstance(instance)
+ val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
+
+ sinkConfigs.foreach { kv =>
+ val classPath = kv._2.getProperty("class")
+ try {
+ val sink = Class.forName(classPath)
+ .getConstructor(classOf[Properties], classOf[MetricRegistry])
+ .newInstance(kv._2, registry)
+ sinks += sink.asInstanceOf[Sink]
+ } catch {
+ case e: Exception => logError("Sink class " + classPath + " cannot be instantialized", e)
+ }
+ }
+ }
+}
+
+private[spark] object MetricsSystem {
+ val SINK_REGEX = "^sink\\.(.+)\\.(.+)".r
+ val SOURCE_REGEX = "^source\\.(.+)\\.(.+)".r
+
+ val MINIMAL_POLL_UNIT = TimeUnit.SECONDS
+ val MINIMAL_POLL_PERIOD = 1
+
+ def checkMinimalPollingPeriod(pollUnit: TimeUnit, pollPeriod: Int) {
+ val period = MINIMAL_POLL_UNIT.convert(pollPeriod, pollUnit)
+ if (period < MINIMAL_POLL_PERIOD) {
+ throw new IllegalArgumentException("Polling period " + pollPeriod + " " + pollUnit +
+ " below than minimal polling period ")
+ }
+ }
+
+ def createMetricsSystem(instance: String): MetricsSystem = new MetricsSystem(instance)
+}
diff --git a/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
new file mode 100644
index 0000000000..966ba37c20
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.metrics.sink
+
+import com.codahale.metrics.{ConsoleReporter, MetricRegistry}
+
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+
+import spark.metrics.MetricsSystem
+
+class ConsoleSink(val property: Properties, val registry: MetricRegistry) extends Sink {
+ val CONSOLE_DEFAULT_PERIOD = 10
+ val CONSOLE_DEFAULT_UNIT = "SECONDS"
+
+ val CONSOLE_KEY_PERIOD = "period"
+ val CONSOLE_KEY_UNIT = "unit"
+
+ val pollPeriod = Option(property.getProperty(CONSOLE_KEY_PERIOD)) match {
+ case Some(s) => s.toInt
+ case None => CONSOLE_DEFAULT_PERIOD
+ }
+
+ val pollUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match {
+ case Some(s) => TimeUnit.valueOf(s.toUpperCase())
+ case None => TimeUnit.valueOf(CONSOLE_DEFAULT_UNIT)
+ }
+
+ MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
+
+ val reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .build()
+
+ override def start() {
+ reporter.start(pollPeriod, pollUnit)
+ }
+
+ override def stop() {
+ reporter.stop()
+ }
+}
+
diff --git a/core/src/main/scala/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/spark/metrics/sink/CsvSink.scala
new file mode 100644
index 0000000000..cb990afdef
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/sink/CsvSink.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.metrics.sink
+
+import com.codahale.metrics.{CsvReporter, MetricRegistry}
+
+import java.io.File
+import java.util.{Locale, Properties}
+import java.util.concurrent.TimeUnit
+
+import spark.metrics.MetricsSystem
+
+class CsvSink(val property: Properties, val registry: MetricRegistry) extends Sink {
+ val CSV_KEY_PERIOD = "period"
+ val CSV_KEY_UNIT = "unit"
+ val CSV_KEY_DIR = "directory"
+
+ val CSV_DEFAULT_PERIOD = 10
+ val CSV_DEFAULT_UNIT = "SECONDS"
+ val CSV_DEFAULT_DIR = "/tmp/"
+
+ val pollPeriod = Option(property.getProperty(CSV_KEY_PERIOD)) match {
+ case Some(s) => s.toInt
+ case None => CSV_DEFAULT_PERIOD
+ }
+
+ val pollUnit = Option(property.getProperty(CSV_KEY_UNIT)) match {
+ case Some(s) => TimeUnit.valueOf(s.toUpperCase())
+ case None => TimeUnit.valueOf(CSV_DEFAULT_UNIT)
+ }
+
+ MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
+
+ val pollDir = Option(property.getProperty(CSV_KEY_DIR)) match {
+ case Some(s) => s
+ case None => CSV_DEFAULT_DIR
+ }
+
+ val reporter: CsvReporter = CsvReporter.forRegistry(registry)
+ .formatFor(Locale.US)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .build(new File(pollDir))
+
+ override def start() {
+ reporter.start(pollPeriod, pollUnit)
+ }
+
+ override def stop() {
+ reporter.stop()
+ }
+}
+
diff --git a/core/src/main/scala/spark/SoftReferenceCache.scala b/core/src/main/scala/spark/metrics/sink/JmxSink.scala
index f41a379582..ee04544c0e 100644
--- a/core/src/main/scala/spark/SoftReferenceCache.scala
+++ b/core/src/main/scala/spark/metrics/sink/JmxSink.scala
@@ -15,21 +15,21 @@
* limitations under the License.
*/
-package spark
+package spark.metrics.sink
-import com.google.common.collect.MapMaker
+import com.codahale.metrics.{JmxReporter, MetricRegistry}
-/**
- * An implementation of Cache that uses soft references.
- */
-private[spark] class SoftReferenceCache extends Cache {
- val map = new MapMaker().softValues().makeMap[Any, Any]()
+import java.util.Properties
+
+class JmxSink(val property: Properties, val registry: MetricRegistry) extends Sink {
+ val reporter: JmxReporter = JmxReporter.forRegistry(registry).build()
- override def get(datasetId: Any, partition: Int): Any =
- map.get((datasetId, partition))
+ override def start() {
+ reporter.start()
+ }
- override def put(datasetId: Any, partition: Int, value: Any): CachePutResponse = {
- map.put((datasetId, partition), value)
- return CachePutSuccess(0)
+ override def stop() {
+ reporter.stop()
}
+
}
diff --git a/core/src/main/scala/spark/metrics/sink/Sink.scala b/core/src/main/scala/spark/metrics/sink/Sink.scala
new file mode 100644
index 0000000000..dad1a7f0fe
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/sink/Sink.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.metrics.sink
+
+trait Sink {
+ def start: Unit
+ def stop: Unit
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/metrics/source/JvmSource.scala b/core/src/main/scala/spark/metrics/source/JvmSource.scala
new file mode 100644
index 0000000000..e771008557
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/source/JvmSource.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.metrics.source
+
+import com.codahale.metrics.MetricRegistry
+import com.codahale.metrics.jvm.{GarbageCollectorMetricSet, MemoryUsageGaugeSet}
+
+class JvmSource extends Source {
+ val sourceName = "jvm"
+ val metricRegistry = new MetricRegistry()
+
+ val gcMetricSet = new GarbageCollectorMetricSet
+ val memGaugeSet = new MemoryUsageGaugeSet
+
+ metricRegistry.registerAll(gcMetricSet)
+ metricRegistry.registerAll(memGaugeSet)
+}
diff --git a/core/src/main/scala/spark/metrics/source/Source.scala b/core/src/main/scala/spark/metrics/source/Source.scala
new file mode 100644
index 0000000000..76199a004b
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/source/Source.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.metrics.source
+
+import com.codahale.metrics.MetricRegistry
+
+trait Source {
+ def sourceName: String
+ def metricRegistry: MetricRegistry
+}
diff --git a/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala
index 191cfde565..d8700becb0 100644
--- a/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala
+++ b/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala
@@ -33,8 +33,9 @@ class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boo
extends NarrowDependency[T](rdd) {
@transient
- val partitions: Array[Partition] = rdd.partitions.filter(s => partitionFilterFunc(s.index))
- .zipWithIndex.map { case(split, idx) => new PartitionPruningRDDPartition(idx, split) : Partition }
+ val partitions: Array[Partition] = rdd.partitions.zipWithIndex
+ .filter(s => partitionFilterFunc(s._2))
+ .map { case(split, idx) => new PartitionPruningRDDPartition(idx, split) : Partition }
override def getParents(partitionId: Int) = List(partitions(partitionId).index)
}
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 29e879aa42..9b45fc2938 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -17,19 +17,17 @@
package spark.scheduler
-import cluster.TaskInfo
-import java.util.concurrent.atomic.AtomicInteger
-import java.util.concurrent.LinkedBlockingQueue
-import java.util.concurrent.TimeUnit
+import java.io.NotSerializableException
import java.util.Properties
+import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
+import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
import spark._
import spark.executor.TaskMetrics
-import spark.partial.ApproximateActionListener
-import spark.partial.ApproximateEvaluator
-import spark.partial.PartialResult
+import spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
+import spark.scheduler.cluster.TaskInfo
import spark.storage.{BlockManager, BlockManagerMaster}
import spark.util.{MetadataCleaner, TimeStampedHashMap}
@@ -52,6 +50,11 @@ class DAGScheduler(
}
taskSched.setListener(this)
+ // Called by TaskScheduler to report task's starting.
+ override def taskStarted(task: Task[_], taskInfo: TaskInfo) {
+ eventQueue.put(BeginEvent(task, taskInfo))
+ }
+
// Called by TaskScheduler to report task completions or failures.
override def taskEnded(
task: Task[_],
@@ -258,7 +261,8 @@ class DAGScheduler(
assert(partitions.size > 0)
val waiter = new JobWaiter(partitions.size, resultHandler)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
- val toSubmit = JobSubmitted(finalRdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)
+ val toSubmit = JobSubmitted(finalRdd, func2, partitions.toArray, allowLocal, callSite, waiter,
+ properties)
return (toSubmit, waiter)
}
@@ -283,7 +287,7 @@ class DAGScheduler(
"Total number of partitions: " + maxPartitions)
}
- val (toSubmit, waiter) = prepareJob(
+ val (toSubmit: JobSubmitted, waiter: JobWaiter[_]) = prepareJob(
finalRdd, func, partitions, callSite, allowLocal, resultHandler, properties)
eventQueue.put(toSubmit)
waiter.awaitResult() match {
@@ -343,6 +347,9 @@ class DAGScheduler(
case ExecutorLost(execId) =>
handleExecutorLost(execId)
+ case begin: BeginEvent =>
+ sparkListeners.foreach(_.onTaskStart(SparkListenerTaskStart(begin.task, begin.taskInfo)))
+
case completion: CompletionEvent =>
sparkListeners.foreach(_.onTaskEnd(SparkListenerTaskEnd(completion.task,
completion.reason, completion.taskInfo, completion.taskMetrics)))
@@ -504,6 +511,19 @@ class DAGScheduler(
}
}
if (tasks.size > 0) {
+ // Preemptively serialize a task to make sure it can be serialized. We are catching this
+ // exception here because it would be fairly hard to catch the non-serializable exception
+ // down the road, where we have several different implementations for local scheduler and
+ // cluster schedulers.
+ try {
+ SparkEnv.get.closureSerializer.newInstance().serialize(tasks.head)
+ } catch {
+ case e: NotSerializableException =>
+ abortStage(stage, e.toString)
+ running -= stage
+ return
+ }
+
sparkListeners.foreach(_.onStageSubmitted(SparkListenerStageSubmitted(stage, tasks.size)))
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
myPending ++= tasks
diff --git a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
index 506c87f65b..3b4ee6287a 100644
--- a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
@@ -43,6 +43,8 @@ private[spark] case class JobSubmitted(
properties: Properties = null)
extends DAGSchedulerEvent
+private[spark] case class BeginEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent
+
private[spark] case class CompletionEvent(
task: Task[_],
reason: TaskEndReason,
diff --git a/core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala
new file mode 100644
index 0000000000..87d27cc70d
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala
@@ -0,0 +1,30 @@
+package spark.scheduler
+
+import com.codahale.metrics.{Gauge,MetricRegistry}
+
+import spark.metrics.source.Source
+
+private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends Source {
+ val metricRegistry = new MetricRegistry()
+ val sourceName = "DAGScheduler"
+
+ metricRegistry.register(MetricRegistry.name("stage", "failedStages", "number"), new Gauge[Int] {
+ override def getValue: Int = dagScheduler.failed.size
+ })
+
+ metricRegistry.register(MetricRegistry.name("stage", "runningStages", "number"), new Gauge[Int] {
+ override def getValue: Int = dagScheduler.running.size
+ })
+
+ metricRegistry.register(MetricRegistry.name("stage", "waitingStages", "number"), new Gauge[Int] {
+ override def getValue: Int = dagScheduler.waiting.size
+ })
+
+ metricRegistry.register(MetricRegistry.name("job", "allJobs", "number"), new Gauge[Int] {
+ override def getValue: Int = dagScheduler.nextRunId.get()
+ })
+
+ metricRegistry.register(MetricRegistry.name("job", "activeJobs", "number"), new Gauge[Int] {
+ override def getValue: Int = dagScheduler.activeJobs.size
+ })
+}
diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala
index 85b5ddd4a8..f7565b8c57 100644
--- a/core/src/main/scala/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/spark/scheduler/JobLogger.scala
@@ -68,6 +68,8 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
processStageCompletedEvent(stageInfo)
case SparkListenerJobEnd(job, result) =>
processJobEndEvent(job, result)
+ case SparkListenerTaskStart(task, taskInfo) =>
+ processTaskStartEvent(task, taskInfo)
case SparkListenerTaskEnd(task, reason, taskInfo, taskMetrics) =>
processTaskEndEvent(task, reason, taskInfo, taskMetrics)
case _ =>
@@ -252,7 +254,19 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
stageInfo.stage.id + " STATUS=COMPLETED")
}
-
+
+ override def onTaskStart(taskStart: SparkListenerTaskStart) {
+ eventQueue.put(taskStart)
+ }
+
+ protected def processTaskStartEvent(task: Task[_], taskInfo: TaskInfo) {
+ var taskStatus = ""
+ task match {
+ case resultTask: ResultTask[_, _] => taskStatus = "TASK_TYPE=RESULT_TASK"
+ case shuffleMapTask: ShuffleMapTask => taskStatus = "TASK_TYPE=SHUFFLE_MAP_TASK"
+ }
+ }
+
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
eventQueue.put(taskEnd)
}
diff --git a/core/src/main/scala/spark/scheduler/ResultTask.scala b/core/src/main/scala/spark/scheduler/ResultTask.scala
index 361b1e6b91..1ced6f9524 100644
--- a/core/src/main/scala/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/spark/scheduler/ResultTask.scala
@@ -118,6 +118,7 @@ private[spark] class ResultTask[T, U](
out.write(bytes)
out.writeInt(partition)
out.writeInt(outputId)
+ out.writeLong(generation)
out.writeObject(split)
}
}
@@ -132,6 +133,7 @@ private[spark] class ResultTask[T, U](
func = func_.asInstanceOf[(TaskContext, Iterator[T]) => U]
partition = in.readInt()
val outputId = in.readInt()
+ generation = in.readLong()
split = in.readObject().asInstanceOf[Partition]
}
}
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index 1c25605f75..e3bb6d1e60 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -18,16 +18,9 @@
package spark.scheduler
import java.io._
-import java.util.{HashMap => JHashMap}
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
-import scala.collection.mutable.{ArrayBuffer, HashMap}
-import scala.collection.JavaConversions._
-
-import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
-
-import com.ning.compress.lzf.LZFInputStream
-import com.ning.compress.lzf.LZFOutputStream
+import scala.collection.mutable.HashMap
import spark._
import spark.executor.ShuffleWriteMetrics
@@ -109,11 +102,7 @@ private[spark] class ShuffleMapTask(
preferredLocs.foreach (hostPort => Utils.checkHost(Utils.parseHostPort(hostPort)._1, "preferredLocs : " + preferredLocs))
}
- var split = if (rdd == null) {
- null
- } else {
- rdd.partitions(partition)
- }
+ var split = if (rdd == null) null else rdd.partitions(partition)
override def writeExternal(out: ObjectOutput) {
RDDCheckpointData.synchronized {
diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala
index 4fb1c5d42d..4eb7e4e6a5 100644
--- a/core/src/main/scala/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/spark/scheduler/SparkListener.scala
@@ -29,6 +29,8 @@ case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int) extends Spar
case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents
+case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents
+
case class SparkListenerTaskEnd(task: Task[_], reason: TaskEndReason, taskInfo: TaskInfo,
taskMetrics: TaskMetrics) extends SparkListenerEvents
@@ -48,7 +50,12 @@ trait SparkListener {
* Called when a stage is submitted
*/
def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { }
-
+
+ /**
+ * Called when a task starts
+ */
+ def onTaskStart(taskEnd: SparkListenerTaskStart) { }
+
/**
* Called when a task ends
*/
diff --git a/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala b/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
index 245e7ccb52..2cdeb1c8c0 100644
--- a/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
+++ b/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
@@ -27,6 +27,9 @@ import spark.executor.TaskMetrics
* Interface for getting events back from the TaskScheduler.
*/
private[spark] trait TaskSchedulerListener {
+ // A task has started.
+ def taskStarted(task: Task[_], taskInfo: TaskInfo)
+
// A task has finished or failed.
def taskEnded(task: Task[_], reason: TaskEndReason, result: Any, accumUpdates: Map[Long, Any],
taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
index b569cf6066..bd0cdad573 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -17,7 +17,8 @@
package spark.scheduler.cluster
-import java.util.{HashMap => JHashMap, NoSuchElementException, Arrays}
+import java.nio.ByteBuffer
+import java.util.{Arrays, NoSuchElementException}
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
@@ -25,12 +26,14 @@ import scala.collection.mutable.HashSet
import scala.math.max
import scala.math.min
-import spark._
-import spark.scheduler._
+import spark.{FetchFailed, Logging, Resubmitted, SparkEnv, Success, TaskEndReason, TaskState, Utils}
+import spark.{ExceptionFailure, SparkException, TaskResultTooBigFailure}
import spark.TaskState.TaskState
-import java.nio.ByteBuffer
+import spark.scheduler.{ShuffleMapTask, Task, TaskResult, TaskSet}
-private[spark] object TaskLocality extends Enumeration("PROCESS_LOCAL", "NODE_LOCAL", "RACK_LOCAL", "ANY") with Logging {
+
+private[spark] object TaskLocality
+ extends Enumeration("PROCESS_LOCAL", "NODE_LOCAL", "RACK_LOCAL", "ANY") with Logging {
// process local is expected to be used ONLY within tasksetmanager for now.
val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value
@@ -43,8 +46,10 @@ private[spark] object TaskLocality extends Enumeration("PROCESS_LOCAL", "NODE_LO
assert (constraint != TaskLocality.PROCESS_LOCAL)
constraint match {
- case TaskLocality.NODE_LOCAL => condition == TaskLocality.NODE_LOCAL
- case TaskLocality.RACK_LOCAL => condition == TaskLocality.NODE_LOCAL || condition == TaskLocality.RACK_LOCAL
+ case TaskLocality.NODE_LOCAL =>
+ condition == TaskLocality.NODE_LOCAL
+ case TaskLocality.RACK_LOCAL =>
+ condition == TaskLocality.NODE_LOCAL || condition == TaskLocality.RACK_LOCAL
// For anything else, allow
case _ => true
}
@@ -56,11 +61,10 @@ private[spark] object TaskLocality extends Enumeration("PROCESS_LOCAL", "NODE_LO
val retval = TaskLocality.withName(str)
// Must not specify PROCESS_LOCAL !
assert (retval != TaskLocality.PROCESS_LOCAL)
-
retval
} catch {
case nEx: NoSuchElementException => {
- logWarning("Invalid task locality specified '" + str + "', defaulting to NODE_LOCAL");
+ logWarning("Invalid task locality specified '" + str + "', defaulting to NODE_LOCAL")
// default to preserve earlier behavior
NODE_LOCAL
}
@@ -71,11 +75,8 @@ private[spark] object TaskLocality extends Enumeration("PROCESS_LOCAL", "NODE_LO
/**
* Schedules the tasks within a single TaskSet in the ClusterScheduler.
*/
-private[spark] class ClusterTaskSetManager(
- sched: ClusterScheduler,
- val taskSet: TaskSet)
- extends TaskSetManager
- with Logging {
+private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet: TaskSet)
+ extends TaskSetManager with Logging {
// Maximum time to wait to run a task in a preferred location (in ms)
val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong
@@ -84,7 +85,7 @@ private[spark] class ClusterTaskSetManager(
val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toDouble
// Maximum times a task is allowed to fail before failing the job
- val MAX_TASK_FAILURES = 4
+ val MAX_TASK_FAILURES = System.getProperty("spark.task.maxFailures", "4").toInt
// Quantile of tasks at which to start speculation
val SPECULATION_QUANTILE = System.getProperty("spark.speculation.quantile", "0.75").toDouble
@@ -107,13 +108,14 @@ private[spark] class ClusterTaskSetManager(
var runningTasks = 0
var priority = taskSet.priority
var stageId = taskSet.stageId
- var name = "TaskSet_"+taskSet.stageId.toString
- var parent:Schedulable = null
+ var name = "TaskSet_" + taskSet.stageId.toString
+ var parent: Schedulable = null
// Last time when we launched a preferred task (for delay scheduling)
var lastPreferredLaunchTime = System.currentTimeMillis
- // List of pending tasks for each node (process local to container). These collections are actually
+ // List of pending tasks for each node (process local to container).
+ // These collections are actually
// treated as stacks, in which new tasks are added to the end of the
// ArrayBuffer and removed from the end. This makes it faster to detect
// tasks that repeatedly fail because whenever a task failed, it is put
@@ -173,9 +175,11 @@ private[spark] class ClusterTaskSetManager(
// Note that it follows the hierarchy.
// if we search for NODE_LOCAL, the output will include PROCESS_LOCAL and
// if we search for RACK_LOCAL, it will include PROCESS_LOCAL & NODE_LOCAL
- private def findPreferredLocations(_taskPreferredLocations: Seq[String], scheduler: ClusterScheduler,
- taskLocality: TaskLocality.TaskLocality): HashSet[String] = {
-
+ private def findPreferredLocations(
+ _taskPreferredLocations: Seq[String],
+ scheduler: ClusterScheduler,
+ taskLocality: TaskLocality.TaskLocality): HashSet[String] =
+ {
if (TaskLocality.PROCESS_LOCAL == taskLocality) {
// straight forward comparison ! Special case it.
val retval = new HashSet[String]()
@@ -190,13 +194,14 @@ private[spark] class ClusterTaskSetManager(
return retval
}
- val taskPreferredLocations =
+ val taskPreferredLocations = {
if (TaskLocality.NODE_LOCAL == taskLocality) {
_taskPreferredLocations
} else {
assert (TaskLocality.RACK_LOCAL == taskLocality)
// Expand set to include all 'seen' rack local hosts.
- // This works since container allocation/management happens within master - so any rack locality information is updated in msater.
+ // This works since container allocation/management happens within master -
+ // so any rack locality information is updated in msater.
// Best case effort, and maybe sort of kludge for now ... rework it later ?
val hosts = new HashSet[String]
_taskPreferredLocations.foreach(h => {
@@ -214,6 +219,7 @@ private[spark] class ClusterTaskSetManager(
hosts
}
+ }
val retval = new HashSet[String]
scheduler.synchronized {
@@ -230,11 +236,13 @@ private[spark] class ClusterTaskSetManager(
// Add a task to all the pending-task lists that it should be on.
private def addPendingTask(index: Int) {
- // We can infer hostLocalLocations from rackLocalLocations by joining it against tasks(index).preferredLocations (with appropriate
- // hostPort <-> host conversion). But not doing it for simplicity sake. If this becomes a performance issue, modify it.
- val processLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.PROCESS_LOCAL)
- val hostLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.NODE_LOCAL)
- val rackLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL)
+ // We can infer hostLocalLocations from rackLocalLocations by joining it against
+ // tasks(index).preferredLocations (with appropriate hostPort <-> host conversion).
+ // But not doing it for simplicity sake. If this becomes a performance issue, modify it.
+ val locs = tasks(index).preferredLocations
+ val processLocalLocations = findPreferredLocations(locs, sched, TaskLocality.PROCESS_LOCAL)
+ val hostLocalLocations = findPreferredLocations(locs, sched, TaskLocality.NODE_LOCAL)
+ val rackLocalLocations = findPreferredLocations(locs, sched, TaskLocality.RACK_LOCAL)
if (rackLocalLocations.size == 0) {
// Current impl ensures this.
@@ -299,18 +307,24 @@ private[spark] class ClusterTaskSetManager(
}
// Number of pending tasks for a given host Port (which would be process local)
- def numPendingTasksForHostPort(hostPort: String): Int = {
- getPendingTasksForHostPort(hostPort).count( index => copiesRunning(index) == 0 && !finished(index) )
+ override def numPendingTasksForHostPort(hostPort: String): Int = {
+ getPendingTasksForHostPort(hostPort).count { index =>
+ copiesRunning(index) == 0 && !finished(index)
+ }
}
// Number of pending tasks for a given host (which would be data local)
- def numPendingTasksForHost(hostPort: String): Int = {
- getPendingTasksForHost(hostPort).count( index => copiesRunning(index) == 0 && !finished(index) )
+ override def numPendingTasksForHost(hostPort: String): Int = {
+ getPendingTasksForHost(hostPort).count { index =>
+ copiesRunning(index) == 0 && !finished(index)
+ }
}
// Number of pending rack local tasks for a given host
- def numRackLocalPendingTasksForHost(hostPort: String): Int = {
- getRackLocalPendingTasksForHost(hostPort).count( index => copiesRunning(index) == 0 && !finished(index) )
+ override def numRackLocalPendingTasksForHost(hostPort: String): Int = {
+ getRackLocalPendingTasksForHost(hostPort).count { index =>
+ copiesRunning(index) == 0 && !finished(index)
+ }
}
@@ -338,12 +352,12 @@ private[spark] class ClusterTaskSetManager(
speculatableTasks.retain(index => !finished(index)) // Remove finished tasks from set
if (speculatableTasks.size > 0) {
- val localTask = speculatableTasks.find {
- index =>
- val locations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.NODE_LOCAL)
- val attemptLocs = taskAttempts(index).map(_.hostPort)
- (locations.size == 0 || locations.contains(hostPort)) && !attemptLocs.contains(hostPort)
- }
+ val localTask = speculatableTasks.find { index =>
+ val locations = findPreferredLocations(tasks(index).preferredLocations, sched,
+ TaskLocality.NODE_LOCAL)
+ val attemptLocs = taskAttempts(index).map(_.hostPort)
+ (locations.size == 0 || locations.contains(hostPort)) && !attemptLocs.contains(hostPort)
+ }
if (localTask != None) {
speculatableTasks -= localTask.get
@@ -352,11 +366,11 @@ private[spark] class ClusterTaskSetManager(
// check for rack locality
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
- val rackTask = speculatableTasks.find {
- index =>
- val locations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL)
- val attemptLocs = taskAttempts(index).map(_.hostPort)
- locations.contains(hostPort) && !attemptLocs.contains(hostPort)
+ val rackTask = speculatableTasks.find { index =>
+ val locations = findPreferredLocations(tasks(index).preferredLocations, sched,
+ TaskLocality.RACK_LOCAL)
+ val attemptLocs = taskAttempts(index).map(_.hostPort)
+ locations.contains(hostPort) && !attemptLocs.contains(hostPort)
}
if (rackTask != None) {
@@ -368,7 +382,9 @@ private[spark] class ClusterTaskSetManager(
// Any task ...
if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
// Check for attemptLocs also ?
- val nonLocalTask = speculatableTasks.find(i => !taskAttempts(i).map(_.hostPort).contains(hostPort))
+ val nonLocalTask = speculatableTasks.find { i =>
+ !taskAttempts(i).map(_.hostPort).contains(hostPort)
+ }
if (nonLocalTask != None) {
speculatableTasks -= nonLocalTask.get
return nonLocalTask
@@ -398,7 +414,8 @@ private[spark] class ClusterTaskSetManager(
}
}
- // Look for no pref tasks AFTER rack local tasks - this has side effect that we will get to failed tasks later rather than sooner.
+ // Look for no pref tasks AFTER rack local tasks - this has side effect that we will get to
+ // failed tasks later rather than sooner.
// TODO: That code path needs to be revisited (adding to no prefs list when host:port goes down).
val noPrefTask = findTaskFromList(pendingTasksWithNoPrefs)
if (noPrefTask != None) {
@@ -434,7 +451,8 @@ private[spark] class ClusterTaskSetManager(
locs.find(h => Utils.parseHostPort(h)._1 == host).isDefined
}
- // Does a host count as a rack local preferred location for a task? (assumes host is NOT preferred location).
+ // Does a host count as a rack local preferred location for a task?
+ // (assumes host is NOT preferred location).
// This is true if either the task has preferred locations and this host is one, or it has
// no preferred locations (in which we still count the launch as preferred).
private def isRackLocalLocation(task: Task[_], hostPort: String): Boolean = {
@@ -455,14 +473,22 @@ private[spark] class ClusterTaskSetManager(
}
// Respond to an offer of a single slave from the scheduler by finding a task
- def slaveOffer(execId: String, hostPort: String, availableCpus: Double, overrideLocality: TaskLocality.TaskLocality = null): Option[TaskDescription] = {
-
+ override def slaveOffer(
+ execId: String,
+ hostPort: String,
+ availableCpus: Double,
+ overrideLocality: TaskLocality.TaskLocality = null): Option[TaskDescription] =
+ {
if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) {
// If explicitly specified, use that
val locality = if (overrideLocality != null) overrideLocality else {
// expand only if we have waited for more than LOCALITY_WAIT for a host local task ...
val time = System.currentTimeMillis
- if (time - lastPreferredLaunchTime < LOCALITY_WAIT) TaskLocality.NODE_LOCAL else TaskLocality.ANY
+ if (time - lastPreferredLaunchTime < LOCALITY_WAIT) {
+ TaskLocality.NODE_LOCAL
+ } else {
+ TaskLocality.ANY
+ }
}
findTask(hostPort, locality) match {
@@ -490,6 +516,8 @@ private[spark] class ClusterTaskSetManager(
}
// Serialize and return the task
val startTime = System.currentTimeMillis
+ // We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
+ // we assume the task can be serialized without exceptions.
val serializedTask = Task.serializeWithDependencies(
task, sched.sc.addedFiles, sched.sc.addedJars, ser)
val timeTaken = System.currentTimeMillis - startTime
@@ -497,6 +525,8 @@ private[spark] class ClusterTaskSetManager(
logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
taskSet.id, index, serializedTask.limit, timeTaken))
val taskName = "task %s:%d".format(taskSet.id, index)
+ if (taskAttempts(index).size == 1)
+ taskStarted(task,info)
return Some(new TaskDescription(taskId, execId, taskName, serializedTask))
}
case _ =>
@@ -505,7 +535,7 @@ private[spark] class ClusterTaskSetManager(
return None
}
- def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
+ override def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
SparkEnv.set(env)
state match {
case TaskState.FINISHED =>
@@ -520,6 +550,10 @@ private[spark] class ClusterTaskSetManager(
}
}
+ def taskStarted(task: Task[_], info: TaskInfo) {
+ sched.listener.taskStarted(task, info)
+ }
+
def taskFinished(tid: Long, state: TaskState, serializedData: ByteBuffer) {
val info = taskInfos(tid)
if (info.failed) {
@@ -532,13 +566,14 @@ private[spark] class ClusterTaskSetManager(
decreaseRunningTasks(1)
if (!finished(index)) {
tasksFinished += 1
- logInfo("Finished TID %s in %d ms (progress: %d/%d)".format(
- tid, info.duration, tasksFinished, numTasks))
+ logInfo("Finished TID %s in %d ms on %s (progress: %d/%d)".format(
+ tid, info.duration, info.hostPort, tasksFinished, numTasks))
// Deserialize task result and pass it to the scheduler
try {
val result = ser.deserialize[TaskResult[_]](serializedData)
result.metrics.resultSize = serializedData.limit()
- sched.listener.taskEnded(tasks(index), Success, result.value, result.accumUpdates, info, result.metrics)
+ sched.listener.taskEnded(
+ tasks(index), Success, result.value, result.accumUpdates, info, result.metrics)
} catch {
case cnf: ClassNotFoundException =>
val loader = Thread.currentThread().getContextClassLoader
@@ -584,8 +619,8 @@ private[spark] class ClusterTaskSetManager(
return
case taskResultTooBig: TaskResultTooBigFailure =>
- logInfo("Loss was due to task %s result exceeding Akka frame size; " +
- "aborting job".format(tid))
+ logInfo("Loss was due to task %s result exceeding Akka frame size; aborting job".format(
+ tid))
abort("Task %s result exceeded Akka frame size".format(tid))
return
@@ -636,7 +671,7 @@ private[spark] class ClusterTaskSetManager(
}
}
- def error(message: String) {
+ override def error(message: String) {
// Save the error message
abort("Error: " + message)
}
@@ -664,7 +699,8 @@ private[spark] class ClusterTaskSetManager(
}
}
- //TODO: for now we just find Pool not TaskSetManager, we can extend this function in future if needed
+ // TODO: for now we just find Pool not TaskSetManager,
+ // we can extend this function in future if needed
override def getSchedulableByName(name: String): Schedulable = {
return null
}
@@ -689,13 +725,15 @@ private[spark] class ClusterTaskSetManager(
// If some task has preferred locations only on hostname, and there are no more executors there,
// put it in the no-prefs list to avoid the wait from delay scheduling
- // host local tasks - should we push this to rack local or no pref list ? For now, preserving behavior and moving to
- // no prefs list. Note, this was done due to impliations related to 'waiting' for data local tasks, etc.
- // Note: NOT checking process local list - since host local list is super set of that. We need to ad to no prefs only if
- // there is no host local node for the task (not if there is no process local node for the task)
+ // host local tasks - should we push this to rack local or no pref list ? For now, preserving
+ // behavior and moving to no prefs list. Note, this was done due to impliations related to
+ // 'waiting' for data local tasks, etc.
+ // Note: NOT checking process local list - since host local list is super set of that. We need
+ // to ad to no prefs only if there is no host local node for the task (not if there is no
+ // process local node for the task)
for (index <- getPendingTasksForHost(Utils.parseHostPort(hostPort)._1)) {
- // val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL)
- val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.NODE_LOCAL)
+ val newLocs = findPreferredLocations(
+ tasks(index).preferredLocations, sched, TaskLocality.NODE_LOCAL)
if (newLocs.isEmpty) {
pendingTasksWithNoPrefs += index
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
index ac9e5ef94d..05c29eb72f 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
@@ -17,46 +17,47 @@
package spark.scheduler.cluster
-import spark.TaskState.TaskState
import java.nio.ByteBuffer
-import spark.util.SerializableBuffer
+
+import spark.TaskState.TaskState
import spark.Utils
+import spark.util.SerializableBuffer
+
private[spark] sealed trait StandaloneClusterMessage extends Serializable
-// Driver to executors
-private[spark]
-case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage
+private[spark] object StandaloneClusterMessages {
-private[spark]
-case class RegisteredExecutor(sparkProperties: Seq[(String, String)])
- extends StandaloneClusterMessage
+ // Driver to executors
+ case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage
-private[spark]
-case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage
+ case class RegisteredExecutor(sparkProperties: Seq[(String, String)])
+ extends StandaloneClusterMessage
-// Executors to driver
-private[spark]
-case class RegisterExecutor(executorId: String, hostPort: String, cores: Int)
- extends StandaloneClusterMessage {
- Utils.checkHostPort(hostPort, "Expected host port")
-}
+ case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage
-private[spark]
-case class StatusUpdate(executorId: String, taskId: Long, state: TaskState, data: SerializableBuffer)
- extends StandaloneClusterMessage
+ // Executors to driver
+ case class RegisterExecutor(executorId: String, hostPort: String, cores: Int)
+ extends StandaloneClusterMessage {
+ Utils.checkHostPort(hostPort, "Expected host port")
+ }
+
+ case class StatusUpdate(executorId: String, taskId: Long, state: TaskState,
+ data: SerializableBuffer) extends StandaloneClusterMessage
-private[spark]
-object StatusUpdate {
- /** Alternate factory method that takes a ByteBuffer directly for the data field */
- def apply(executorId: String, taskId: Long, state: TaskState, data: ByteBuffer): StatusUpdate = {
- StatusUpdate(executorId, taskId, state, new SerializableBuffer(data))
+ object StatusUpdate {
+ /** Alternate factory method that takes a ByteBuffer directly for the data field */
+ def apply(executorId: String, taskId: Long, state: TaskState, data: ByteBuffer)
+ : StatusUpdate = {
+ StatusUpdate(executorId, taskId, state, new SerializableBuffer(data))
+ }
}
-}
-// Internal messages in driver
-private[spark] case object ReviveOffers extends StandaloneClusterMessage
-private[spark] case object StopDriver extends StandaloneClusterMessage
+ // Internal messages in driver
+ case object ReviveOffers extends StandaloneClusterMessage
-private[spark] case class RemoveExecutor(executorId: String, reason: String)
- extends StandaloneClusterMessage
+ case object StopDriver extends StandaloneClusterMessage
+
+ case class RemoveExecutor(executorId: String, reason: String) extends StandaloneClusterMessage
+
+}
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 03a64e0192..075a7cbf7e 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -17,17 +17,18 @@
package spark.scheduler.cluster
+import java.util.concurrent.atomic.AtomicInteger
+
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import akka.actor._
-import akka.util.duration._
+import akka.dispatch.Await
import akka.pattern.ask
+import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
import akka.util.Duration
import spark.{Utils, SparkException, Logging, TaskState}
-import akka.dispatch.Await
-import java.util.concurrent.atomic.AtomicInteger
-import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
+import spark.scheduler.cluster.StandaloneClusterMessages._
/**
* A standalone scheduler backend, which waits for standalone executors to connect to it through
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala b/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
index a1ebd48b01..c693b722ac 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
@@ -51,6 +51,17 @@ class TaskInfo(
def running: Boolean = !finished
+ def status: String = {
+ if (running)
+ "RUNNING"
+ else if (failed)
+ "FAILED"
+ else if (successful)
+ "SUCCESS"
+ else
+ "UNKNOWN"
+ }
+
def duration: Long = {
if (!finished) {
throw new UnsupportedOperationException("duration() called on unfinished tasks")
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index 07c3ddcc7e..7978a5df74 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -17,18 +17,28 @@
package spark.scheduler.cluster
-import scala.collection.mutable.ArrayBuffer
-import spark.scheduler._
-import spark.TaskState.TaskState
import java.nio.ByteBuffer
+import spark.TaskState.TaskState
+import spark.scheduler.TaskSet
+
private[spark] trait TaskSetManager extends Schedulable {
+
def taskSet: TaskSet
- def slaveOffer(execId: String, hostPort: String, availableCpus: Double,
- overrideLocality: TaskLocality.TaskLocality = null): Option[TaskDescription]
+
+ def slaveOffer(
+ execId: String,
+ hostPort: String,
+ availableCpus: Double,
+ overrideLocality: TaskLocality.TaskLocality = null): Option[TaskDescription]
+
def numPendingTasksForHostPort(hostPort: String): Int
- def numRackLocalPendingTasksForHost(hostPort :String): Int
+
+ def numRackLocalPendingTasksForHost(hostPort: String): Int
+
def numPendingTasksForHost(hostPort: String): Int
+
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer)
+
def error(message: String)
}
diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
index c8cb39184d..7ae8f06f6e 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
@@ -37,10 +37,15 @@ import akka.actor._
* testing fault recovery.
*/
-private[spark] case class LocalReviveOffers()
-private[spark] case class LocalStatusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer)
+private[spark]
+case class LocalReviveOffers()
+
+private[spark]
+case class LocalStatusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer)
+
+private[spark]
+class LocalActor(localScheduler: LocalScheduler, var freeCores: Int) extends Actor with Logging {
-private[spark] class LocalActor(localScheduler: LocalScheduler, var freeCores: Int) extends Actor with Logging {
def receive = {
case LocalReviveOffers =>
launchTask(localScheduler.resourceOffer(freeCores))
@@ -55,7 +60,7 @@ private[spark] class LocalActor(localScheduler: LocalScheduler, var freeCores: I
freeCores -= 1
localScheduler.threadPool.submit(new Runnable {
def run() {
- localScheduler.runTask(task.taskId,task.serializedTask)
+ localScheduler.runTask(task.taskId, task.serializedTask)
}
})
}
@@ -110,7 +115,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
override def submitTasks(taskSet: TaskSet) {
synchronized {
- var manager = new LocalTaskSetManager(this, taskSet)
+ val manager = new LocalTaskSetManager(this, taskSet)
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
activeTaskSets(taskSet.id) = manager
taskSetTaskIds(taskSet.id) = new HashSet[Long]()
@@ -124,14 +129,15 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
val tasks = new ArrayBuffer[TaskDescription](freeCores)
val sortedTaskSetQueue = rootPool.getSortedTaskSetQueue()
for (manager <- sortedTaskSetQueue) {
- logDebug("parentName:%s,name:%s,runningTasks:%s".format(manager.parent.name, manager.name, manager.runningTasks))
+ logDebug("parentName:%s,name:%s,runningTasks:%s".format(
+ manager.parent.name, manager.name, manager.runningTasks))
}
var launchTask = false
for (manager <- sortedTaskSetQueue) {
do {
launchTask = false
- manager.slaveOffer(null,null,freeCpuCores) match {
+ manager.slaveOffer(null, null, freeCpuCores) match {
case Some(task) =>
tasks += task
taskIdToTaskSetId(task.taskId) = manager.taskSet.id
@@ -139,7 +145,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
freeCpuCores -= 1
launchTask = true
case None => {}
- }
+ }
} while(launchTask)
}
return tasks
diff --git a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
index 5d0402ed46..9837f5cd69 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
@@ -17,27 +17,26 @@
package spark.scheduler.local
-import java.io.File
-import java.util.concurrent.atomic.AtomicInteger
import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
-import spark._
+import spark.{ExceptionFailure, Logging, SparkEnv, Success, TaskState}
import spark.TaskState.TaskState
-import spark.scheduler._
-import spark.scheduler.cluster._
+import spark.scheduler.{Task, TaskResult, TaskSet}
+import spark.scheduler.cluster.{Schedulable, TaskDescription, TaskInfo, TaskLocality, TaskSetManager}
+
+
+private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: TaskSet)
+ extends TaskSetManager with Logging {
-private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: TaskSet) extends TaskSetManager with Logging {
var parent: Schedulable = null
var weight: Int = 1
var minShare: Int = 0
var runningTasks: Int = 0
var priority: Int = taskSet.priority
var stageId: Int = taskSet.stageId
- var name: String = "TaskSet_"+taskSet.stageId.toString
-
+ var name: String = "TaskSet_" + taskSet.stageId.toString
var failCount = new Array[Int](taskSet.tasks.size)
val taskInfos = new HashMap[Long, TaskInfo]
@@ -50,49 +49,45 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
val numFailures = new Array[Int](numTasks)
val MAX_TASK_FAILURES = sched.maxFailures
- def increaseRunningTasks(taskNum: Int): Unit = {
- runningTasks += taskNum
- if (parent != null) {
- parent.increaseRunningTasks(taskNum)
- }
+ override def increaseRunningTasks(taskNum: Int): Unit = {
+ runningTasks += taskNum
+ if (parent != null) {
+ parent.increaseRunningTasks(taskNum)
+ }
}
- def decreaseRunningTasks(taskNum: Int): Unit = {
+ override def decreaseRunningTasks(taskNum: Int): Unit = {
runningTasks -= taskNum
if (parent != null) {
parent.decreaseRunningTasks(taskNum)
}
}
- def addSchedulable(schedulable: Schedulable): Unit = {
+ override def addSchedulable(schedulable: Schedulable): Unit = {
//nothing
}
- def removeSchedulable(schedulable: Schedulable): Unit = {
+ override def removeSchedulable(schedulable: Schedulable): Unit = {
//nothing
}
- def getSchedulableByName(name: String): Schedulable = {
+ override def getSchedulableByName(name: String): Schedulable = {
return null
}
- def executorLost(executorId: String, host: String): Unit = {
+ override def executorLost(executorId: String, host: String): Unit = {
//nothing
}
- def checkSpeculatableTasks(): Boolean = {
- return true
- }
+ override def checkSpeculatableTasks() = true
- def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
+ override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
sortedTaskSetQueue += this
return sortedTaskSetQueue
}
- def hasPendingTasks(): Boolean = {
- return true
- }
+ override def hasPendingTasks() = true
def findTask(): Option[Int] = {
for (i <- 0 to numTasks-1) {
@@ -103,21 +98,32 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
return None
}
- def slaveOffer(execId: String, hostPort: String, availableCpus: Double, overrideLocality: TaskLocality.TaskLocality = null): Option[TaskDescription] = {
+ override def slaveOffer(
+ execId: String,
+ hostPort: String,
+ availableCpus: Double,
+ overrideLocality: TaskLocality.TaskLocality = null): Option[TaskDescription] =
+ {
SparkEnv.set(sched.env)
- logDebug("availableCpus:%d,numFinished:%d,numTasks:%d".format(availableCpus.toInt, numFinished, numTasks))
+ logDebug("availableCpus:%d,numFinished:%d,numTasks:%d".format(
+ availableCpus.toInt, numFinished, numTasks))
if (availableCpus > 0 && numFinished < numTasks) {
findTask() match {
case Some(index) =>
val taskId = sched.attemptId.getAndIncrement()
val task = taskSet.tasks(index)
- val info = new TaskInfo(taskId, index, System.currentTimeMillis(), "local", "local:1", TaskLocality.NODE_LOCAL)
+ val info = new TaskInfo(taskId, index, System.currentTimeMillis(), "local", "local:1",
+ TaskLocality.NODE_LOCAL)
taskInfos(taskId) = info
- val bytes = Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
+ // We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
+ // we assume the task can be serialized without exceptions.
+ val bytes = Task.serializeWithDependencies(
+ task, sched.sc.addedFiles, sched.sc.addedJars, ser)
logInfo("Size of task " + taskId + " is " + bytes.limit + " bytes")
val taskName = "task %s:%d".format(taskSet.id, index)
copiesRunning(index) += 1
increaseRunningTasks(1)
+ taskStarted(task, info)
return Some(new TaskDescription(taskId, null, taskName, bytes))
case None => {}
}
@@ -125,19 +131,19 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
return None
}
- def numPendingTasksForHostPort(hostPort: String): Int = {
+ override def numPendingTasksForHostPort(hostPort: String): Int = {
return 0
}
- def numRackLocalPendingTasksForHost(hostPort :String): Int = {
+ override def numRackLocalPendingTasksForHost(hostPort :String): Int = {
return 0
}
- def numPendingTasksForHost(hostPort: String): Int = {
+ override def numPendingTasksForHost(hostPort: String): Int = {
return 0
}
- def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
+ override def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
SparkEnv.set(env)
state match {
case TaskState.FINISHED =>
@@ -148,6 +154,10 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
}
}
+ def taskStarted(task: Task[_], info: TaskInfo) {
+ sched.listener.taskStarted(task, info)
+ }
+
def taskEnded(tid: Long, state: TaskState, serializedData: ByteBuffer) {
val info = taskInfos(tid)
val index = info.index
@@ -170,15 +180,18 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
val task = taskSet.tasks(index)
info.markFailed()
decreaseRunningTasks(1)
- val reason: ExceptionFailure = ser.deserialize[ExceptionFailure](serializedData, getClass.getClassLoader)
+ val reason: ExceptionFailure = ser.deserialize[ExceptionFailure](
+ serializedData, getClass.getClassLoader)
sched.listener.taskEnded(task, reason, null, null, info, reason.metrics.getOrElse(null))
if (!finished(index)) {
copiesRunning(index) -= 1
numFailures(index) += 1
val locs = reason.stackTrace.map(loc => "\tat %s".format(loc.toString))
- logInfo("Loss was due to %s\n%s\n%s".format(reason.className, reason.description, locs.mkString("\n")))
+ logInfo("Loss was due to %s\n%s\n%s".format(
+ reason.className, reason.description, locs.mkString("\n")))
if (numFailures(index) > MAX_TASK_FAILURES) {
- val errorMessage = "Task %s:%d failed more than %d times; aborting job %s".format(taskSet.id, index, 4, reason.description)
+ val errorMessage = "Task %s:%d failed more than %d times; aborting job %s".format(
+ taskSet.id, index, 4, reason.description)
decreaseRunningTasks(runningTasks)
sched.listener.taskSetFailed(taskSet, errorMessage)
// need to delete failed Taskset from schedule queue
@@ -187,6 +200,6 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
}
}
- def error(message: String) {
+ override def error(message: String) {
}
}
diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
index 7bc6040544..6ebbb5ec9b 100644
--- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
@@ -110,12 +110,6 @@ private[spark] class CoarseMesosSchedulerBackend(
}
def createCommand(offer: Offer, numCores: Int): CommandInfo = {
- val runScript = new File(sparkHome, "run").getCanonicalPath
- val driverUrl = "akka://spark@%s:%s/user/%s".format(
- System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
- StandaloneSchedulerBackend.ACTOR_NAME)
- val command = "\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
- runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)
val environment = Environment.newBuilder()
sc.executorEnvs.foreach { case (key, value) =>
environment.addVariables(Environment.Variable.newBuilder()
@@ -123,7 +117,26 @@ private[spark] class CoarseMesosSchedulerBackend(
.setValue(value)
.build())
}
- return CommandInfo.newBuilder().setValue(command).setEnvironment(environment).build()
+ val command = CommandInfo.newBuilder()
+ .setEnvironment(environment)
+ val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ System.getProperty("spark.driver.host"),
+ System.getProperty("spark.driver.port"),
+ StandaloneSchedulerBackend.ACTOR_NAME)
+ val uri = System.getProperty("spark.executor.uri")
+ if (uri == null) {
+ val runScript = new File(sparkHome, "run").getCanonicalPath
+ command.setValue("\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
+ runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
+ } else {
+ // Grab everything to the first '.'. We'll use that and '*' to
+ // glob the directory "correctly".
+ val basename = uri.split('/').last.split('.').head
+ command.setValue("cd %s*; ./run spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
+ basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
+ command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
+ }
+ return command.build()
}
override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
index 75b8268b55..f6069a5775 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
@@ -89,7 +89,6 @@ private[spark] class MesosSchedulerBackend(
val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
"Spark home is not set; set it through the spark.home system " +
"property, the SPARK_HOME environment variable or the SparkContext constructor"))
- val execScript = new File(sparkHome, "spark-executor").getCanonicalPath
val environment = Environment.newBuilder()
sc.executorEnvs.foreach { case (key, value) =>
environment.addVariables(Environment.Variable.newBuilder()
@@ -97,15 +96,23 @@ private[spark] class MesosSchedulerBackend(
.setValue(value)
.build())
}
+ val command = CommandInfo.newBuilder()
+ .setEnvironment(environment)
+ val uri = System.getProperty("spark.executor.uri")
+ if (uri == null) {
+ command.setValue(new File(sparkHome, "spark-executor").getCanonicalPath)
+ } else {
+ // Grab everything to the first '.'. We'll use that and '*' to
+ // glob the directory "correctly".
+ val basename = uri.split('/').last.split('.').head
+ command.setValue("cd %s*; ./spark-executor".format(basename))
+ command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
+ }
val memory = Resource.newBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(executorMemory).build())
.build()
- val command = CommandInfo.newBuilder()
- .setValue(execScript)
- .setEnvironment(environment)
- .build()
ExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
.setCommand(command)
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index e4ffa57ad2..3a72474419 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -27,11 +27,10 @@ import akka.dispatch.{Await, Future}
import akka.util.Duration
import akka.util.duration._
-import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
-
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
import spark.{Logging, SparkEnv, SparkException, Utils}
+import spark.io.CompressionCodec
import spark.network._
import spark.serializer.Serializer
import spark.util.{ByteBufferInputStream, IdGenerator, MetadataCleaner, TimeStampedHashMap}
@@ -158,6 +157,13 @@ private[spark] class BlockManager(
val metadataCleaner = new MetadataCleaner("BlockManager", this.dropOldBlocks)
initialize()
+ // The compression codec to use. Note that the "lazy" val is necessary because we want to delay
+ // the initialization of the compression codec until it is first used. The reason is that a Spark
+ // program could be using a user-defined codec in a third party jar, which is loaded in
+ // Executor.updateDependencies. When the BlockManager is initialized, user level jars hasn't been
+ // loaded yet.
+ private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec()
+
/**
* Construct a BlockManager with a memory limit set based on system properties.
*/
@@ -919,18 +925,14 @@ private[spark] class BlockManager(
* Wrap an output stream for compression if block compression is enabled for its block type
*/
def wrapForCompression(blockId: String, s: OutputStream): OutputStream = {
- if (shouldCompress(blockId)) {
- (new LZFOutputStream(s)).setFinishBlockOnFlush(true)
- } else {
- s
- }
+ if (shouldCompress(blockId)) compressionCodec.compressedOutputStream(s) else s
}
/**
* Wrap an input stream for compression if block compression is enabled for its block type
*/
def wrapForCompression(blockId: String, s: InputStream): InputStream = {
- if (shouldCompress(blockId)) new LZFInputStream(s) else s
+ if (shouldCompress(blockId)) compressionCodec.compressedInputStream(s) else s
}
def dataSerialize(
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index 3186f7c85b..76128e8cff 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -23,6 +23,7 @@ import akka.pattern.ask
import akka.util.Duration
import spark.{Logging, SparkException}
+import spark.storage.BlockManagerMessages._
private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Logging {
diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
index 244000d952..011bb6b83d 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
@@ -29,6 +29,8 @@ import akka.util.Duration
import akka.util.duration._
import spark.{Logging, Utils, SparkException}
+import spark.storage.BlockManagerMessages._
+
/**
* BlockManagerMasterActor is an actor on the master node to track statuses of
diff --git a/core/src/main/scala/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
index 01de4ccb8f..9375a9ca54 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
@@ -22,102 +22,89 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}
import akka.actor.ActorRef
-//////////////////////////////////////////////////////////////////////////////////
-// Messages from the master to slaves.
-//////////////////////////////////////////////////////////////////////////////////
-private[spark]
-sealed trait ToBlockManagerSlave
-
-// Remove a block from the slaves that have it. This can only be used to remove
-// blocks that the master knows about.
-private[spark]
-case class RemoveBlock(blockId: String) extends ToBlockManagerSlave
-
-// Remove all blocks belonging to a specific RDD.
-private[spark] case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave
-
-
-//////////////////////////////////////////////////////////////////////////////////
-// Messages from slaves to the master.
-//////////////////////////////////////////////////////////////////////////////////
-private[spark]
-sealed trait ToBlockManagerMaster
-
-private[spark]
-case class RegisterBlockManager(
- blockManagerId: BlockManagerId,
- maxMemSize: Long,
- sender: ActorRef)
- extends ToBlockManagerMaster
-
-private[spark]
-case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
-
-private[spark]
-class UpdateBlockInfo(
- var blockManagerId: BlockManagerId,
- var blockId: String,
- var storageLevel: StorageLevel,
- var memSize: Long,
- var diskSize: Long)
- extends ToBlockManagerMaster
- with Externalizable {
-
- def this() = this(null, null, null, 0, 0) // For deserialization only
-
- override def writeExternal(out: ObjectOutput) {
- blockManagerId.writeExternal(out)
- out.writeUTF(blockId)
- storageLevel.writeExternal(out)
- out.writeLong(memSize)
- out.writeLong(diskSize)
+private[storage] object BlockManagerMessages {
+ //////////////////////////////////////////////////////////////////////////////////
+ // Messages from the master to slaves.
+ //////////////////////////////////////////////////////////////////////////////////
+ sealed trait ToBlockManagerSlave
+
+ // Remove a block from the slaves that have it. This can only be used to remove
+ // blocks that the master knows about.
+ case class RemoveBlock(blockId: String) extends ToBlockManagerSlave
+
+ // Remove all blocks belonging to a specific RDD.
+ case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave
+
+
+ //////////////////////////////////////////////////////////////////////////////////
+ // Messages from slaves to the master.
+ //////////////////////////////////////////////////////////////////////////////////
+ sealed trait ToBlockManagerMaster
+
+ case class RegisterBlockManager(
+ blockManagerId: BlockManagerId,
+ maxMemSize: Long,
+ sender: ActorRef)
+ extends ToBlockManagerMaster
+
+ case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
+
+ class UpdateBlockInfo(
+ var blockManagerId: BlockManagerId,
+ var blockId: String,
+ var storageLevel: StorageLevel,
+ var memSize: Long,
+ var diskSize: Long)
+ extends ToBlockManagerMaster
+ with Externalizable {
+
+ def this() = this(null, null, null, 0, 0) // For deserialization only
+
+ override def writeExternal(out: ObjectOutput) {
+ blockManagerId.writeExternal(out)
+ out.writeUTF(blockId)
+ storageLevel.writeExternal(out)
+ out.writeLong(memSize)
+ out.writeLong(diskSize)
+ }
+
+ override def readExternal(in: ObjectInput) {
+ blockManagerId = BlockManagerId(in)
+ blockId = in.readUTF()
+ storageLevel = StorageLevel(in)
+ memSize = in.readLong()
+ diskSize = in.readLong()
+ }
}
- override def readExternal(in: ObjectInput) {
- blockManagerId = BlockManagerId(in)
- blockId = in.readUTF()
- storageLevel = StorageLevel(in)
- memSize = in.readLong()
- diskSize = in.readLong()
+ object UpdateBlockInfo {
+ def apply(blockManagerId: BlockManagerId,
+ blockId: String,
+ storageLevel: StorageLevel,
+ memSize: Long,
+ diskSize: Long): UpdateBlockInfo = {
+ new UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize)
+ }
+
+ // For pattern-matching
+ def unapply(h: UpdateBlockInfo): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
+ Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize))
+ }
}
-}
-private[spark]
-object UpdateBlockInfo {
- def apply(blockManagerId: BlockManagerId,
- blockId: String,
- storageLevel: StorageLevel,
- memSize: Long,
- diskSize: Long): UpdateBlockInfo = {
- new UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize)
- }
+ case class GetLocations(blockId: String) extends ToBlockManagerMaster
- // For pattern-matching
- def unapply(h: UpdateBlockInfo): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
- Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize))
- }
-}
+ case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster
-private[spark]
-case class GetLocations(blockId: String) extends ToBlockManagerMaster
+ case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
-private[spark]
-case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster
+ case class RemoveExecutor(execId: String) extends ToBlockManagerMaster
-private[spark]
-case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
+ case object StopBlockManagerMaster extends ToBlockManagerMaster
-private[spark]
-case class RemoveExecutor(execId: String) extends ToBlockManagerMaster
+ case object GetMemoryStatus extends ToBlockManagerMaster
-private[spark]
-case object StopBlockManagerMaster extends ToBlockManagerMaster
+ case object ExpireDeadHosts extends ToBlockManagerMaster
-private[spark]
-case object GetMemoryStatus extends ToBlockManagerMaster
-
-private[spark]
-case object ExpireDeadHosts extends ToBlockManagerMaster
-
-private[spark]
-case object GetStorageStatus extends ToBlockManagerMaster
+ case object GetStorageStatus extends ToBlockManagerMaster
+}
diff --git a/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
index 45cffad810..6e5fb43732 100644
--- a/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
@@ -19,7 +19,7 @@ package spark.storage
import akka.actor.Actor
-import spark.{Logging, SparkException, Utils}
+import spark.storage.BlockManagerMessages._
/**
diff --git a/core/src/main/scala/spark/storage/BlockManagerSource.scala b/core/src/main/scala/spark/storage/BlockManagerSource.scala
new file mode 100644
index 0000000000..2aecd1ea71
--- /dev/null
+++ b/core/src/main/scala/spark/storage/BlockManagerSource.scala
@@ -0,0 +1,48 @@
+package spark.storage
+
+import com.codahale.metrics.{Gauge,MetricRegistry}
+
+import spark.metrics.source.Source
+
+
+private[spark] class BlockManagerSource(val blockManager: BlockManager) extends Source {
+ val metricRegistry = new MetricRegistry()
+ val sourceName = "BlockManager"
+
+ metricRegistry.register(MetricRegistry.name("memory", "maxMem", "MBytes"), new Gauge[Long] {
+ override def getValue: Long = {
+ val storageStatusList = blockManager.master.getStorageStatus
+ val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
+ maxMem / 1024 / 1024
+ }
+ })
+
+ metricRegistry.register(MetricRegistry.name("memory", "remainingMem", "MBytes"), new Gauge[Long] {
+ override def getValue: Long = {
+ val storageStatusList = blockManager.master.getStorageStatus
+ val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
+ remainingMem / 1024 / 1024
+ }
+ })
+
+ metricRegistry.register(MetricRegistry.name("memory", "memUsed", "MBytes"), new Gauge[Long] {
+ override def getValue: Long = {
+ val storageStatusList = blockManager.master.getStorageStatus
+ val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
+ val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
+ (maxMem - remainingMem) / 1024 / 1024
+ }
+ })
+
+ metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed", "MBytes"), new Gauge[Long] {
+ override def getValue: Long = {
+ val storageStatusList = blockManager.master.getStorageStatus
+ val diskSpaceUsed = storageStatusList
+ .flatMap(_.blocks.values.map(_.diskSize))
+ .reduceOption(_ + _)
+ .getOrElse(0L)
+
+ diskSpaceUsed / 1024 / 1024
+ }
+ })
+}
diff --git a/core/src/main/scala/spark/storage/BlockMessage.scala b/core/src/main/scala/spark/storage/BlockMessage.scala
index ab72dbb62b..bcce26b7c1 100644
--- a/core/src/main/scala/spark/storage/BlockMessage.scala
+++ b/core/src/main/scala/spark/storage/BlockMessage.scala
@@ -22,7 +22,6 @@ import java.nio.ByteBuffer
import scala.collection.mutable.StringBuilder
import scala.collection.mutable.ArrayBuffer
-import spark._
import spark.network._
private[spark] case class GetBlock(id: String)
diff --git a/core/src/main/scala/spark/storage/BlockMessageArray.scala b/core/src/main/scala/spark/storage/BlockMessageArray.scala
index b0229d6124..ee2fc167d5 100644
--- a/core/src/main/scala/spark/storage/BlockMessageArray.scala
+++ b/core/src/main/scala/spark/storage/BlockMessageArray.scala
@@ -19,7 +19,6 @@ package spark.storage
import java.nio.ByteBuffer
-import scala.collection.mutable.StringBuilder
import scala.collection.mutable.ArrayBuffer
import spark._
@@ -113,7 +112,7 @@ private[spark] object BlockMessageArray {
def main(args: Array[String]) {
val blockMessages =
- (0 until 10).map(i => {
+ (0 until 10).map { i =>
if (i % 2 == 0) {
val buffer = ByteBuffer.allocate(100)
buffer.clear
@@ -121,7 +120,7 @@ private[spark] object BlockMessageArray {
} else {
BlockMessage.fromGetBlock(GetBlock(i.toString))
}
- })
+ }
val blockMessageArray = new BlockMessageArray(blockMessages)
println("Block message array created")
diff --git a/core/src/main/scala/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/spark/storage/BlockObjectWriter.scala
index 01ed6e8c1f..3812009ca1 100644
--- a/core/src/main/scala/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/spark/storage/BlockObjectWriter.scala
@@ -17,8 +17,6 @@
package spark.storage
-import java.nio.ByteBuffer
-
/**
* An interface for writing JVM objects to some underlying storage. This interface allows
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index 3495d653bd..3ebfe173b1 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -66,7 +66,6 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
override def close() {
if (initialized) {
objOut.close()
- bs.close()
channel = null
bs = null
objOut = null
diff --git a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
index 20ea54d6a6..4be2bfa413 100644
--- a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
@@ -5,7 +5,7 @@ import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.Handler
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.util.Properties
import spark.{ExceptionFailure, Logging, SparkContext, Success, Utils}
@@ -18,7 +18,6 @@ import spark.ui.JettyUtils._
import spark.ui.Page.Executors
import spark.ui.UIUtils.headerSparkPage
import spark.ui.UIUtils
-import spark.Utils
import scala.xml.{Node, XML}
@@ -45,7 +44,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
.reduceOption(_+_).getOrElse(0L)
val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used",
- "Failed tasks", "Complete tasks", "Total tasks")
+ "Active tasks", "Failed tasks", "Complete tasks", "Total tasks")
def execRow(kv: Seq[String]) =
<tr>
<td>{kv(0)}</td>
@@ -60,6 +59,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
<td>{kv(6)}</td>
<td>{kv(7)}</td>
<td>{kv(8)}</td>
+ <td>{kv(9)}</td>
</tr>
val execInfo =
for (b <- 0 until storageStatusList.size)
@@ -93,6 +93,8 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val memUsed = sc.getExecutorStorageStatus(a).memUsed().toString
val maxMem = sc.getExecutorStorageStatus(a).maxMem.toString
val diskUsed = sc.getExecutorStorageStatus(a).diskUsed().toString
+ val activeTasks = listener.executorToTasksActive.get(a.toString).map(l => l.size)
+ .getOrElse(0).toString
val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0).toString
val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0).toString
val totalTasks = listener.executorToTaskInfos(a.toString).size.toString
@@ -104,6 +106,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
memUsed,
maxMem,
diskUsed,
+ activeTasks,
failedTasks,
completedTasks,
totalTasks
@@ -111,13 +114,26 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
}
private[spark] class ExecutorsListener extends SparkListener with Logging {
+ val executorToTasksActive = HashMap[String, HashSet[TaskInfo]]()
val executorToTasksComplete = HashMap[String, Int]()
val executorToTasksFailed = HashMap[String, Int]()
val executorToTaskInfos =
HashMap[String, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
+ override def onTaskStart(taskStart: SparkListenerTaskStart) {
+ val eid = taskStart.taskInfo.executorId
+ val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]())
+ activeTasks += taskStart.taskInfo
+ val taskList = executorToTaskInfos.getOrElse(
+ eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+ taskList += ((taskStart.taskInfo, None, None))
+ executorToTaskInfos(eid) = taskList
+ }
+
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val eid = taskEnd.taskInfo.executorId
+ val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]())
+ activeTasks -= taskEnd.taskInfo
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
case e: ExceptionFailure =>
@@ -125,12 +141,13 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
(Some(e), e.metrics)
case _ =>
executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
- (None, Some(taskEnd.taskMetrics))
+ (None, Option(taskEnd.taskMetrics))
}
val taskList = executorToTaskInfos.getOrElse(
eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+ taskList -= ((taskEnd.taskInfo, None, None))
taskList += ((taskEnd.taskInfo, metrics, failureInfo))
executorToTaskInfos(eid) = taskList
}
}
-} \ No newline at end of file
+}
diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
index f31af3cda6..a843b5ea2f 100644
--- a/core/src/main/scala/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
@@ -21,13 +21,16 @@ import java.util.Date
import javax.servlet.http.HttpServletRequest
+import scala.collection.mutable.HashSet
import scala.Some
import scala.xml.{NodeSeq, Node}
+import spark.scheduler.cluster.TaskInfo
import spark.scheduler.Stage
-import spark.ui.UIUtils._
-import spark.ui.Page._
import spark.storage.StorageLevel
+import spark.ui.Page._
+import spark.ui.UIUtils._
+import spark.Utils
/** Page showing list of all ongoing and recently finished stages */
private[spark] class IndexPage(parent: JobProgressUI) {
@@ -38,6 +41,12 @@ private[spark] class IndexPage(parent: JobProgressUI) {
val activeStages = listener.activeStages.toSeq
val completedStages = listener.completedStages.reverse.toSeq
val failedStages = listener.failedStages.reverse.toSeq
+ val now = System.currentTimeMillis()
+
+ var activeTime = 0L
+ for (tasks <- listener.stageToTasksActive.values; t <- tasks) {
+ activeTime += t.timeRunning(now)
+ }
/** Special table which merges two header cells. */
def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
@@ -48,7 +57,8 @@ private[spark] class IndexPage(parent: JobProgressUI) {
<th>Submitted</th>
<th>Duration</th>
<th colspan="2">Tasks: Complete/Total</th>
- <th>Shuffle Activity</th>
+ <th>Shuffle Read</th>
+ <th>Shuffle Write</th>
<th>Stored RDD</th>
</thead>
<tbody>
@@ -57,11 +67,33 @@ private[spark] class IndexPage(parent: JobProgressUI) {
</table>
}
+ val summary: NodeSeq =
+ <div>
+ <ul class="unstyled">
+ <li>
+ <strong>CPU time: </strong>
+ {parent.formatDuration(listener.totalTime + activeTime)}
+ </li>
+ {if (listener.totalShuffleRead > 0)
+ <li>
+ <strong>Shuffle read: </strong>
+ {Utils.memoryBytesToString(listener.totalShuffleRead)}
+ </li>
+ }
+ {if (listener.totalShuffleWrite > 0)
+ <li>
+ <strong>Shuffle write: </strong>
+ {Utils.memoryBytesToString(listener.totalShuffleWrite)}
+ </li>
+ }
+ </ul>
+ </div>
val activeStageTable: NodeSeq = stageTable(stageRow, activeStages)
val completedStageTable = stageTable(stageRow, completedStages)
val failedStageTable: NodeSeq = stageTable(stageRow, failedStages)
- val content = <h2>Active Stages</h2> ++ activeStageTable ++
+ val content = summary ++
+ <h2>Active Stages</h2> ++ activeStageTable ++
<h2>Completed Stages</h2> ++ completedStageTable ++
<h2>Failed Stages</h2> ++ failedStageTable
@@ -75,17 +107,14 @@ private[spark] class IndexPage(parent: JobProgressUI) {
}
}
- def makeProgressBar(completed: Int, total: Int): Seq[Node] = {
- val width=130
- val height=15
- val completeWidth = (completed.toDouble / total) * width
-
- <svg width={width.toString} height={height.toString}>
- <rect width={width.toString} height={height.toString}
- fill="white" stroke="rgb(51,51,51)" stroke-width="1" />
- <rect width={completeWidth.toString} height={height.toString}
- fill="rgb(0,136,204)" stroke="black" stroke-width="1" />
- </svg>
+ def makeProgressBar(started: Int, completed: Int, total: Int): Seq[Node] = {
+ val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
+ val startWidth = "width: %s%%".format((started.toDouble/total)*100)
+
+ <div class="progress" style="height: 15px; margin-bottom: 0px">
+ <div class="bar" style={completeWidth}></div>
+ <div class="bar bar-info" style={startWidth}></div>
+ </div>
}
@@ -94,13 +123,17 @@ private[spark] class IndexPage(parent: JobProgressUI) {
case Some(t) => dateFmt.format(new Date(t))
case None => "Unknown"
}
- val (read, write) = (listener.hasShuffleRead(s.id), listener.hasShuffleWrite(s.id))
- val shuffleInfo = (read, write) match {
- case (true, true) => "Read/Write"
- case (true, false) => "Read"
- case (false, true) => "Write"
- case _ => ""
+
+ val shuffleRead = listener.stageToShuffleRead.getOrElse(s.id, 0L) match {
+ case 0 => ""
+ case b => Utils.memoryBytesToString(b)
}
+ val shuffleWrite = listener.stageToShuffleWrite.getOrElse(s.id, 0L) match {
+ case 0 => ""
+ case b => Utils.memoryBytesToString(b)
+ }
+
+ val startedTasks = listener.stageToTasksActive.getOrElse(s.id, HashSet[TaskInfo]()).size
val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0)
val totalTasks = s.numPartitions
@@ -110,14 +143,15 @@ private[spark] class IndexPage(parent: JobProgressUI) {
<td>{submissionTime}</td>
<td>{getElapsedTime(s.submissionTime,
s.completionTime.getOrElse(System.currentTimeMillis()))}</td>
- <td class="progress-cell">{makeProgressBar(completedTasks, totalTasks)}</td>
+ <td class="progress-cell">{makeProgressBar(startedTasks, completedTasks, totalTasks)}</td>
<td style="border-left: 0; text-align: center;">{completedTasks} / {totalTasks}
{listener.stageToTasksFailed.getOrElse(s.id, 0) match {
case f if f > 0 => "(%s failed)".format(f)
case _ =>
}}
</td>
- <td>{shuffleInfo}</td>
+ <td>{shuffleRead}</td>
+ <td>{shuffleWrite}</td>
<td>{if (s.rdd.getStorageLevel != StorageLevel.NONE) {
<a href={"/storage/rdd?id=%s".format(s.rdd.id)}>
{Option(s.rdd.name).getOrElse(s.rdd.id)}
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
index 44dcf82d11..09d24b6302 100644
--- a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
@@ -65,6 +65,15 @@ private[spark] class JobProgressListener extends SparkListener {
val completedStages = ListBuffer[Stage]()
val failedStages = ListBuffer[Stage]()
+ // Total metrics reflect metrics only for completed tasks
+ var totalTime = 0L
+ var totalShuffleRead = 0L
+ var totalShuffleWrite = 0L
+
+ val stageToTime = HashMap[Int, Long]()
+ val stageToShuffleRead = HashMap[Int, Long]()
+ val stageToShuffleWrite = HashMap[Int, Long]()
+ val stageToTasksActive = HashMap[Int, HashSet[TaskInfo]]()
val stageToTasksComplete = HashMap[Int, Int]()
val stageToTasksFailed = HashMap[Int, Int]()
val stageToTaskInfos =
@@ -85,6 +94,12 @@ private[spark] class JobProgressListener extends SparkListener {
val toRemove = RETAINED_STAGES / 10
stages.takeRight(toRemove).foreach( s => {
stageToTaskInfos.remove(s.id)
+ stageToTime.remove(s.id)
+ stageToShuffleRead.remove(s.id)
+ stageToShuffleWrite.remove(s.id)
+ stageToTasksActive.remove(s.id)
+ stageToTasksComplete.remove(s.id)
+ stageToTasksFailed.remove(s.id)
})
stages.trimEnd(toRemove)
}
@@ -93,8 +108,20 @@ private[spark] class JobProgressListener extends SparkListener {
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) =
activeStages += stageSubmitted.stage
+ override def onTaskStart(taskStart: SparkListenerTaskStart) {
+ val sid = taskStart.task.stageId
+ val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
+ tasksActive += taskStart.taskInfo
+ val taskList = stageToTaskInfos.getOrElse(
+ sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+ taskList += ((taskStart.taskInfo, None, None))
+ stageToTaskInfos(sid) = taskList
+ }
+
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val sid = taskEnd.task.stageId
+ val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
+ tasksActive -= taskEnd.taskInfo
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
case e: ExceptionFailure =>
@@ -102,10 +129,29 @@ private[spark] class JobProgressListener extends SparkListener {
(Some(e), e.metrics)
case _ =>
stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
- (None, Some(taskEnd.taskMetrics))
+ (None, Option(taskEnd.taskMetrics))
}
+
+ stageToTime.getOrElseUpdate(sid, 0L)
+ val time = metrics.map(m => m.executorRunTime).getOrElse(0)
+ stageToTime(sid) += time
+ totalTime += time
+
+ stageToShuffleRead.getOrElseUpdate(sid, 0L)
+ val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s =>
+ s.remoteBytesRead).getOrElse(0L)
+ stageToShuffleRead(sid) += shuffleRead
+ totalShuffleRead += shuffleRead
+
+ stageToShuffleWrite.getOrElseUpdate(sid, 0L)
+ val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s =>
+ s.shuffleBytesWritten).getOrElse(0L)
+ stageToShuffleWrite(sid) += shuffleWrite
+ totalShuffleWrite += shuffleWrite
+
val taskList = stageToTaskInfos.getOrElse(
sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+ taskList -= ((taskEnd.taskInfo, None, None))
taskList += ((taskEnd.taskInfo, metrics, failureInfo))
stageToTaskInfos(sid) = taskList
}
@@ -123,22 +169,4 @@ private[spark] class JobProgressListener extends SparkListener {
case _ =>
}
}
-
- /** Is this stage's input from a shuffle read. */
- def hasShuffleRead(stageID: Int): Boolean = {
- // This is written in a slightly complicated way to avoid having to scan all tasks
- for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
- if (s._2 != null) return s._2.flatMap(m => m.shuffleReadMetrics).isDefined
- }
- return false // No tasks have finished for this stage
- }
-
- /** Is this stage's output to a shuffle write. */
- def hasShuffleWrite(stageID: Int): Boolean = {
- // This is written in a slightly complicated way to avoid having to scan all tasks
- for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
- if (s._2 != null) return s._2.flatMap(m => m.shuffleWriteMetrics).isDefined
- }
- return false // No tasks have finished for this stage
- }
}
diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala
index 292966f23a..e327cb3947 100644
--- a/core/src/main/scala/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala
@@ -37,23 +37,51 @@ private[spark] class StagePage(parent: JobProgressUI) {
def render(request: HttpServletRequest): Seq[Node] = {
val stageId = request.getParameter("id").toInt
+ val now = System.currentTimeMillis()
if (!listener.stageToTaskInfos.contains(stageId)) {
val content =
<div>
- <h2>Summary Metrics</h2> No tasks have finished yet
- <h2>Tasks</h2> No tasks have finished yet
+ <h2>Summary Metrics</h2> No tasks have started yet
+ <h2>Tasks</h2> No tasks have started yet
</div>
return headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs)
}
val tasks = listener.stageToTaskInfos(stageId)
- val shuffleRead = listener.hasShuffleRead(stageId)
- val shuffleWrite = listener.hasShuffleWrite(stageId)
+ val shuffleRead = listener.stageToShuffleRead(stageId) > 0
+ val shuffleWrite = listener.stageToShuffleWrite(stageId) > 0
+
+ var activeTime = 0L
+ listener.stageToTasksActive(stageId).foreach { t =>
+ activeTime += t.timeRunning(now)
+ }
+
+ val summary =
+ <div>
+ <ul class="unstyled">
+ <li>
+ <strong>CPU time: </strong>
+ {parent.formatDuration(listener.stageToTime(stageId) + activeTime)}
+ </li>
+ {if (shuffleRead)
+ <li>
+ <strong>Shuffle read: </strong>
+ {Utils.memoryBytesToString(listener.stageToShuffleRead(stageId))}
+ </li>
+ }
+ {if (shuffleWrite)
+ <li>
+ <strong>Shuffle write: </strong>
+ {Utils.memoryBytesToString(listener.stageToShuffleWrite(stageId))}
+ </li>
+ }
+ </ul>
+ </div>
val taskHeaders: Seq[String] =
- Seq("Task ID", "Duration", "Locality Level", "Worker", "Launch Time") ++
+ Seq("Task ID", "Status", "Duration", "Locality Level", "Worker", "Launch Time") ++
{if (shuffleRead) Seq("Shuffle Read") else Nil} ++
{if (shuffleWrite) Seq("Shuffle Write") else Nil} ++
Seq("Details")
@@ -61,7 +89,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
val taskTable = listingTable(taskHeaders, taskRow, tasks)
// Excludes tasks which failed and have incomplete metrics
- val validTasks = tasks.filter(t => Option(t._2).isDefined)
+ val validTasks = tasks.filter(t => t._1.status == "SUCCESS" && (Option(t._2).isDefined))
val summaryTable: Option[Seq[Node]] =
if (validTasks.size == 0) {
@@ -98,7 +126,8 @@ private[spark] class StagePage(parent: JobProgressUI) {
}
val content =
- <h2>Summary Metrics</h2> ++ summaryTable.getOrElse(Nil) ++ <h2>Tasks</h2> ++ taskTable;
+ summary ++ <h2>Summary Metrics</h2> ++ summaryTable.getOrElse(Nil) ++
+ <h2>Tasks</h2> ++ taskTable;
headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs)
}
@@ -108,10 +137,17 @@ private[spark] class StagePage(parent: JobProgressUI) {
def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] =
trace.map(e => <span style="display:block;">{e.toString}</span>)
val (info, metrics, exception) = taskData
+
+ val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis())
+ else metrics.map(m => m.executorRunTime).getOrElse(1)
+ val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration)
+ else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("")
+
<tr>
<td>{info.taskId}</td>
- <td sorttable_customkey={metrics.map{m => m.executorRunTime.toString}.getOrElse("1")}>
- {metrics.map{m => parent.formatDuration(m.executorRunTime)}.getOrElse("")}
+ <td>{info.status}</td>
+ <td sorttable_customkey={duration.toString}>
+ {formatDuration}
</td>
<td>{info.taskLocality}</td>
<td>{info.hostPort}</td>
diff --git a/core/src/main/scala/spark/util/Vector.scala b/core/src/main/scala/spark/util/Vector.scala
index ed49386f18..a47cac3b96 100644
--- a/core/src/main/scala/spark/util/Vector.scala
+++ b/core/src/main/scala/spark/util/Vector.scala
@@ -73,7 +73,6 @@ class Vector(val elements: Array[Double]) extends Serializable {
def += (other: Vector): Vector = {
if (length != other.length)
throw new IllegalArgumentException("Vectors of different length")
- var ans = 0.0
var i = 0
while (i < length) {
elements(i) += other(i)
@@ -117,9 +116,7 @@ object Vector {
def apply(elements: Double*) = new Vector(elements.toArray)
def apply(length: Int, initializer: Int => Double): Vector = {
- val elements = new Array[Double](length)
- for (i <- 0 until length)
- elements(i) = initializer(i)
+ val elements: Array[Double] = Array.tabulate(length)(initializer)
return new Vector(elements)
}
diff --git a/core/src/test/resources/test_metrics_config.properties b/core/src/test/resources/test_metrics_config.properties
new file mode 100644
index 0000000000..2b31ddf2eb
--- /dev/null
+++ b/core/src/test/resources/test_metrics_config.properties
@@ -0,0 +1,6 @@
+*.sink.console.period = 10
+*.sink.console.unit = seconds
+*.source.jvm.class = spark.metrics.source.JvmSource
+master.sink.console.period = 20
+master.sink.console.unit = minutes
+
diff --git a/core/src/test/resources/test_metrics_system.properties b/core/src/test/resources/test_metrics_system.properties
new file mode 100644
index 0000000000..d5479f0298
--- /dev/null
+++ b/core/src/test/resources/test_metrics_system.properties
@@ -0,0 +1,7 @@
+*.sink.console.period = 10
+*.sink.console.unit = seconds
+test.sink.console.class = spark.metrics.sink.ConsoleSink
+test.sink.dummy.class = spark.metrics.sink.DummySink
+test.source.dummy.class = spark.metrics.source.DummySource
+test.sink.console.period = 20
+test.sink.console.unit = minutes
diff --git a/core/src/test/scala/spark/FailureSuite.scala b/core/src/test/scala/spark/FailureSuite.scala
index 6c847b8fef..5b133cdd6e 100644
--- a/core/src/test/scala/spark/FailureSuite.scala
+++ b/core/src/test/scala/spark/FailureSuite.scala
@@ -18,9 +18,6 @@
package spark
import org.scalatest.FunSuite
-import org.scalatest.prop.Checkers
-
-import scala.collection.mutable.ArrayBuffer
import SparkContext._
@@ -40,7 +37,7 @@ object FailureSuiteState {
}
class FailureSuite extends FunSuite with LocalSparkContext {
-
+
// Run a 3-task map job in which task 1 deterministically fails once, and check
// whether the job completes successfully and we ran 4 tasks in total.
test("failure in a single-stage job") {
@@ -66,7 +63,7 @@ class FailureSuite extends FunSuite with LocalSparkContext {
test("failure in a two-stage job") {
sc = new SparkContext("local[1,1]", "test")
val results = sc.makeRDD(1 to 3).map(x => (x, x)).groupByKey(3).map {
- case (k, v) =>
+ case (k, v) =>
FailureSuiteState.synchronized {
FailureSuiteState.tasksRun += 1
if (k == 1 && FailureSuiteState.tasksFailed == 0) {
@@ -87,12 +84,40 @@ class FailureSuite extends FunSuite with LocalSparkContext {
sc = new SparkContext("local[1,1]", "test")
val results = sc.makeRDD(1 to 3).map(x => new NonSerializable)
- val thrown = intercept[spark.SparkException] {
+ val thrown = intercept[SparkException] {
results.collect()
}
- assert(thrown.getClass === classOf[spark.SparkException])
+ assert(thrown.getClass === classOf[SparkException])
+ assert(thrown.getMessage.contains("NotSerializableException"))
+
+ FailureSuiteState.clear()
+ }
+
+ test("failure because task closure is not serializable") {
+ sc = new SparkContext("local[1,1]", "test")
+ val a = new NonSerializable
+
+ // Non-serializable closure in the final result stage
+ val thrown = intercept[SparkException] {
+ sc.parallelize(1 to 10, 2).map(x => a).count()
+ }
+ assert(thrown.getClass === classOf[SparkException])
assert(thrown.getMessage.contains("NotSerializableException"))
+ // Non-serializable closure in an earlier stage
+ val thrown1 = intercept[SparkException] {
+ sc.parallelize(1 to 10, 2).map(x => (x, a)).partitionBy(new HashPartitioner(3)).count()
+ }
+ assert(thrown1.getClass === classOf[SparkException])
+ assert(thrown1.getMessage.contains("NotSerializableException"))
+
+ // Non-serializable closure in foreach function
+ val thrown2 = intercept[SparkException] {
+ sc.parallelize(1 to 10, 2).foreach(x => println(a))
+ }
+ assert(thrown2.getClass === classOf[SparkException])
+ assert(thrown2.getMessage.contains("NotSerializableException"))
+
FailureSuiteState.clear()
}
diff --git a/core/src/test/scala/spark/KryoSerializerSuite.scala b/core/src/test/scala/spark/KryoSerializerSuite.scala
index 6d9d3129c2..f03d1c822a 100644
--- a/core/src/test/scala/spark/KryoSerializerSuite.scala
+++ b/core/src/test/scala/spark/KryoSerializerSuite.scala
@@ -18,14 +18,11 @@
package spark
import scala.collection.mutable
-import scala.collection.immutable
import org.scalatest.FunSuite
import com.esotericsoftware.kryo._
-import SparkContext._
import spark.test._
-
class KryoSerializerSuite extends FunSuite with SharedSparkContext {
test("basic types") {
val ser = (new KryoSerializer).newInstance()
@@ -54,6 +51,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
check(Array(true, false, true))
check(Array('a', 'b', 'c'))
check(Array[Int]())
+ check(Array(Array("1", "2"), Array("1", "2", "3", "4")))
}
test("pairs") {
@@ -104,7 +102,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
}
test("custom registrator") {
- import spark.test._
+ import KryoTest._
System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName)
val ser = (new KryoSerializer).newInstance()
@@ -124,7 +122,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
val hashMap = new java.util.HashMap[String, String]
hashMap.put("foo", "bar")
check(hashMap)
-
+
System.clearProperty("spark.kryo.registrator")
}
@@ -168,9 +166,9 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
}
}
-package test {
+object KryoTest {
case class CaseClass(i: Int, s: String) {}
-
+
class ClassWithNoArgConstructor {
var x: Int = 0
override def equals(other: Any) = other match {
@@ -194,4 +192,4 @@ package test {
k.register(classOf[java.util.HashMap[_, _]])
}
}
-}
+} \ No newline at end of file
diff --git a/core/src/test/scala/spark/PartitionPruningRDDSuite.scala b/core/src/test/scala/spark/PartitionPruningRDDSuite.scala
new file mode 100644
index 0000000000..88352b639f
--- /dev/null
+++ b/core/src/test/scala/spark/PartitionPruningRDDSuite.scala
@@ -0,0 +1,28 @@
+package spark
+
+import org.scalatest.FunSuite
+import spark.SparkContext._
+import spark.rdd.PartitionPruningRDD
+
+
+class PartitionPruningRDDSuite extends FunSuite with SharedSparkContext {
+
+ test("Pruned Partitions inherit locality prefs correctly") {
+ class TestPartition(i: Int) extends Partition {
+ def index = i
+ }
+ val rdd = new RDD[Int](sc, Nil) {
+ override protected def getPartitions = {
+ Array[Partition](
+ new TestPartition(1),
+ new TestPartition(2),
+ new TestPartition(3))
+ }
+ def compute(split: Partition, context: TaskContext) = {Iterator()}
+ }
+ val prunedRDD = PartitionPruningRDD.create(rdd, {x => if (x==2) true else false})
+ val p = prunedRDD.partitions(0)
+ assert(p.index == 2)
+ assert(prunedRDD.partitions.length == 1)
+ }
+}
diff --git a/core/src/test/scala/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/spark/io/CompressionCodecSuite.scala
new file mode 100644
index 0000000000..1ba82fe2b9
--- /dev/null
+++ b/core/src/test/scala/spark/io/CompressionCodecSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.io
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+
+import org.scalatest.FunSuite
+
+
+class CompressionCodecSuite extends FunSuite {
+
+ def testCodec(codec: CompressionCodec) {
+ // Write 1000 integers to the output stream, compressed.
+ val outputStream = new ByteArrayOutputStream()
+ val out = codec.compressedOutputStream(outputStream)
+ for (i <- 1 until 1000) {
+ out.write(i % 256)
+ }
+ out.close()
+
+ // Read the 1000 integers back.
+ val inputStream = new ByteArrayInputStream(outputStream.toByteArray)
+ val in = codec.compressedInputStream(inputStream)
+ for (i <- 1 until 1000) {
+ assert(in.read() === i % 256)
+ }
+ in.close()
+ }
+
+ test("default compression codec") {
+ val codec = CompressionCodec.createCodec()
+ assert(codec.getClass === classOf[SnappyCompressionCodec])
+ testCodec(codec)
+ }
+
+ test("lzf compression codec") {
+ val codec = CompressionCodec.createCodec(classOf[LZFCompressionCodec].getName)
+ assert(codec.getClass === classOf[LZFCompressionCodec])
+ testCodec(codec)
+ }
+
+ test("snappy compression codec") {
+ val codec = CompressionCodec.createCodec(classOf[SnappyCompressionCodec].getName)
+ assert(codec.getClass === classOf[SnappyCompressionCodec])
+ testCodec(codec)
+ }
+}
diff --git a/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
new file mode 100644
index 0000000000..87cd2ffad2
--- /dev/null
+++ b/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
@@ -0,0 +1,64 @@
+package spark.metrics
+
+import java.util.Properties
+import java.io.{File, FileOutputStream}
+
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+import spark.metrics._
+
+class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
+ var filePath: String = _
+
+ before {
+ filePath = getClass.getClassLoader.getResource("test_metrics_config.properties").getFile()
+ }
+
+ test("MetricsConfig with default properties") {
+ val conf = new MetricsConfig(Option("dummy-file"))
+ conf.initialize()
+
+ assert(conf.properties.size() === 0)
+ assert(conf.properties.getProperty("test-for-dummy") === null)
+
+ val property = conf.getInstance("random")
+ assert(property.size() === 0)
+ }
+
+ test("MetricsConfig with properties set") {
+ val conf = new MetricsConfig(Option(filePath))
+ conf.initialize()
+
+ val masterProp = conf.getInstance("master")
+ assert(masterProp.size() === 3)
+ assert(masterProp.getProperty("sink.console.period") === "20")
+ assert(masterProp.getProperty("sink.console.unit") === "minutes")
+ assert(masterProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
+
+ val workerProp = conf.getInstance("worker")
+ assert(workerProp.size() === 3)
+ assert(workerProp.getProperty("sink.console.period") === "10")
+ assert(workerProp.getProperty("sink.console.unit") === "seconds")
+ assert(masterProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
+ }
+
+ test("MetricsConfig with subProperties") {
+ val conf = new MetricsConfig(Option(filePath))
+ conf.initialize()
+
+ val propCategories = conf.propertyCategories
+ assert(propCategories.size === 2)
+
+ val masterProp = conf.getInstance("master")
+ val sourceProps = conf.subProperties(masterProp, MetricsSystem.SOURCE_REGEX)
+ assert(sourceProps.size === 1)
+ assert(sourceProps("jvm").getProperty("class") === "spark.metrics.source.JvmSource")
+
+ val sinkProps = conf.subProperties(masterProp, MetricsSystem.SINK_REGEX)
+ assert(sinkProps.size === 1)
+ assert(sinkProps.contains("console"))
+
+ val consoleProps = sinkProps("console")
+ assert(consoleProps.size() === 2)
+ }
+}
diff --git a/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala
new file mode 100644
index 0000000000..c189996417
--- /dev/null
+++ b/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala
@@ -0,0 +1,39 @@
+package spark.metrics
+
+import java.util.Properties
+import java.io.{File, FileOutputStream}
+
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+import spark.metrics._
+
+class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
+ var filePath: String = _
+
+ before {
+ filePath = getClass.getClassLoader.getResource("test_metrics_system.properties").getFile()
+ System.setProperty("spark.metrics.conf", filePath)
+ }
+
+ test("MetricsSystem with default config") {
+ val metricsSystem = MetricsSystem.createMetricsSystem("default")
+ val sources = metricsSystem.sources
+ val sinks = metricsSystem.sinks
+
+ assert(sources.length === 0)
+ assert(sinks.length === 0)
+ }
+
+ test("MetricsSystem with sources add") {
+ val metricsSystem = MetricsSystem.createMetricsSystem("test")
+ val sources = metricsSystem.sources
+ val sinks = metricsSystem.sinks
+
+ assert(sources.length === 0)
+ assert(sinks.length === 1)
+
+ val source = new spark.deploy.master.MasterSource(null)
+ metricsSystem.registerSource(source)
+ assert(sources.length === 1)
+ }
+}
diff --git a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
index 8f81d0b6ee..05afcd6567 100644
--- a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
@@ -33,7 +33,7 @@ class DummyTaskSetManager(
initNumTasks: Int,
clusterScheduler: ClusterScheduler,
taskSet: TaskSet)
- extends ClusterTaskSetManager(clusterScheduler,taskSet) {
+ extends ClusterTaskSetManager(clusterScheduler, taskSet) {
parent = null
weight = 1
diff --git a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
index f802b66cf1..a8b88d7936 100644
--- a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
@@ -23,21 +23,14 @@ import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import spark.LocalSparkContext
-
-import spark.storage.BlockManager
-import spark.storage.BlockManagerId
-import spark.storage.BlockManagerMaster
-import spark.{Dependency, ShuffleDependency, OneToOneDependency}
-import spark.FetchFailedException
import spark.MapOutputTracker
import spark.RDD
import spark.SparkContext
-import spark.SparkException
import spark.Partition
import spark.TaskContext
-import spark.TaskEndReason
-
-import spark.{FetchFailed, Success}
+import spark.{Dependency, ShuffleDependency, OneToOneDependency}
+import spark.{FetchFailed, Success, TaskEndReason}
+import spark.storage.{BlockManagerId, BlockManagerMaster}
/**
* Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler
diff --git a/docs/configuration.md b/docs/configuration.md
index 5c06897cae..99624a44aa 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -35,7 +35,7 @@ for these variables.
* `SPARK_JAVA_OPTS`, to add JVM options. This includes any system properties that you'd like to pass with `-D`.
* `SPARK_CLASSPATH`, to add elements to Spark's classpath.
* `SPARK_LIBRARY_PATH`, to add search directories for native libraries.
-* `SPARK_MEM`, to set the amount of memory used per node. This should be in the same format as the
+* `SPARK_MEM`, to set the amount of memory used per node. This should be in the same format as the
JVM's -Xmx option, e.g. `300m` or `1g`. Note that this option will soon be deprecated in favor of
the `spark.executor.memory` system property, so we recommend using that in new code.
@@ -77,7 +77,7 @@ there are at least five properties that you will commonly want to control:
Class to use for serializing objects that will be sent over the network or need to be cached
in serialized form. The default of Java serialization works with any Serializable Java object but is
quite slow, so we recommend <a href="tuning.html">using <code>spark.KryoSerializer</code>
- and configuring Kryo serialization</a> when speed is necessary. Can be any subclass of
+ and configuring Kryo serialization</a> when speed is necessary. Can be any subclass of
<a href="api/core/index.html#spark.Serializer"><code>spark.Serializer</code></a>).
</td>
</tr>
@@ -86,7 +86,7 @@ there are at least five properties that you will commonly want to control:
<td>(none)</td>
<td>
If you use Kryo serialization, set this class to register your custom classes with Kryo.
- You need to set it to a class that extends
+ You need to set it to a class that extends
<a href="api/core/index.html#spark.KryoRegistrator"><code>spark.KryoRegistrator</code></a>).
See the <a href="tuning.html#data-serialization">tuning guide</a> for more details.
</td>
@@ -181,6 +181,21 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
+ <td>spark.io.compression.codec</td>
+ <td>spark.io.SnappyCompressionCodec</td>
+ <td>
+ The compression codec class to use for various compressions. By default, Spark provides two
+ codecs: <code>spark.io.LZFCompressionCodec</code> and <code>spark.io.SnappyCompressionCodec</code>.
+ </td>
+</tr>
+<tr>
+ <td>spark.io.compression.snappy.block.size</td>
+ <td>32768</td>
+ <td>
+ Block size (in bytes) used in Snappy compression, in the case when Snappy compression codec is used.
+ </td>
+</tr>
+<tr>
<td>spark.reducer.maxMbInFlight</td>
<td>48</td>
<td>
@@ -295,6 +310,14 @@ Apart from these, the following properties are also available, and may be useful
Duration (milliseconds) of how long to batch new objects coming from network receivers.
</td>
</tr>
+<tr>
+ <td>spark.task.maxFailures</td>
+ <td>4</td>
+ <td>
+ Number of individual task failures before giving up on the job.
+ Should be greater than or equal to 1. Number of allowed retries = this value - 1.
+ </td>
+</tr>
</table>
diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md
index e8aaac74d0..794bff5647 100644
--- a/docs/python-programming-guide.md
+++ b/docs/python-programming-guide.md
@@ -10,6 +10,7 @@ To learn the basics of Spark, we recommend reading through the
easy to follow even if you don't know Scala.
This guide will show how to use the Spark features described there in Python.
+
# Key Differences in the Python API
There are a few key differences between the Python and Scala APIs:
@@ -50,6 +51,7 @@ PySpark will automatically ship these functions to workers, along with any objec
Instances of classes will be serialized and shipped to workers by PySpark, but classes themselves cannot be automatically distributed to workers.
The [Standalone Use](#standalone-use) section describes how to ship code dependencies to workers.
+
# Installing and Configuring PySpark
PySpark requires Python 2.6 or higher.
@@ -81,16 +83,41 @@ The Python shell can be used explore data interactively and is a simple way to l
>>> help(pyspark) # Show all pyspark functions
{% endhighlight %}
-By default, the `pyspark` shell creates SparkContext that runs jobs locally.
-To connect to a non-local cluster, set the `MASTER` environment variable.
+By default, the `pyspark` shell creates SparkContext that runs jobs locally on a single core.
+To connect to a non-local cluster, or use multiple cores, set the `MASTER` environment variable.
For example, to use the `pyspark` shell with a [standalone Spark cluster](spark-standalone.html):
{% highlight bash %}
$ MASTER=spark://IP:PORT ./pyspark
{% endhighlight %}
+Or, to use four cores on the local machine:
+
+{% highlight bash %}
+$ MASTER=local[4] ./pyspark
+{% endhighlight %}
+
+
+## IPython
+
+It is also possible to launch PySpark in [IPython](http://ipython.org), the enhanced Python interpreter.
+To do this, simply set the `IPYTHON` variable to `1` when running `pyspark`:
+
+{% highlight bash %}
+$ IPYTHON=1 ./pyspark
+{% endhighlight %}
+
+Alternatively, you can customize the `ipython` command by setting `IPYTHON_OPTS`. For example, to launch
+the [IPython Notebook](http://ipython.org/notebook.html) with PyLab graphing support:
-# Standalone Use
+{% highlight bash %}
+$ IPYTHON_OPTS="notebook --pylab inline" ./pyspark
+{% endhighlight %}
+
+IPython also works on a cluster or on multiple cores if you set the `MASTER` environment variable.
+
+
+# Standalone Programs
PySpark can also be used from standalone Python scripts by creating a SparkContext in your script and running the script using `pyspark`.
The Quick Start guide includes a [complete example](quick-start.html#a-standalone-job-in-python) of a standalone Python job.
@@ -105,6 +132,7 @@ sc = SparkContext("local", "Job Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg
Files listed here will be added to the `PYTHONPATH` and shipped to remote worker machines.
Code dependencies can be added to an existing SparkContext using its `addPyFile()` method.
+
# Where to Go from Here
PySpark includes several sample programs in the [`python/examples` folder](https://github.com/mesos/spark/tree/master/python/examples).
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 2ec3c007fb..740ec08542 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -9,9 +9,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -53,7 +53,7 @@ def parse_args():
help="Seconds to wait for nodes to start (default: 120)")
parser.add_option("-k", "--key-pair",
help="Key pair to use on instances")
- parser.add_option("-i", "--identity-file",
+ parser.add_option("-i", "--identity-file",
help="SSH private key file to use for logging into instances")
parser.add_option("-t", "--instance-type", default="m1.large",
help="Type of instance to launch (default: m1.large). " +
@@ -69,7 +69,7 @@ def parse_args():
parser.add_option("-a", "--ami", default="latest",
help="Amazon Machine Image ID to use, or 'latest' to use latest " +
"available AMI (default: latest)")
- parser.add_option("-D", metavar="[ADDRESS:]PORT", dest="proxy_port",
+ parser.add_option("-D", metavar="[ADDRESS:]PORT", dest="proxy_port",
help="Use SSH dynamic port forwarding to create a SOCKS proxy at " +
"the given local address (for use with login)")
parser.add_option("--resume", action="store_true", default=False,
@@ -99,7 +99,7 @@ def parse_args():
help="The SSH user you want to connect as (default: root)")
parser.add_option("--delete-groups", action="store_true", default=False,
help="When destroying a cluster, delete the security groups that were created")
-
+
(opts, args) = parser.parse_args()
if len(args) != 2:
parser.print_help()
@@ -112,7 +112,7 @@ def parse_args():
if opts.cluster_type not in ["mesos", "standalone"] and action == "launch":
print >> stderr, ("ERROR: Invalid cluster type: " + opts.cluster_type)
sys.exit(1)
-
+
# Boto config check
# http://boto.cloudhackers.com/en/latest/boto_config_tut.html
home_dir = os.getenv('HOME')
@@ -178,6 +178,7 @@ def launch_cluster(conn, opts, cluster_name):
master_group.authorize('tcp', 50030, 50030, '0.0.0.0/0')
master_group.authorize('tcp', 50070, 50070, '0.0.0.0/0')
master_group.authorize('tcp', 60070, 60070, '0.0.0.0/0')
+ master_group.authorize('tcp', 33000, 33010, '0.0.0.0/0')
if opts.cluster_type == "mesos":
master_group.authorize('tcp', 38090, 38090, '0.0.0.0/0')
if opts.ganglia:
@@ -257,7 +258,7 @@ def launch_cluster(conn, opts, cluster_name):
block_device_map = block_map)
my_req_ids += [req.id for req in slave_reqs]
i += 1
-
+
print "Waiting for spot instances to be granted..."
try:
while True:
@@ -413,7 +414,7 @@ def setup_standalone_cluster(master, slave_nodes, opts):
slave_ips = '\n'.join([i.public_dns_name for i in slave_nodes])
ssh(master, opts, "echo \"%s\" > spark/conf/slaves" % (slave_ips))
ssh(master, opts, "/root/spark/bin/start-all.sh")
-
+
def setup_spark_cluster(master, opts):
ssh(master, opts, "chmod u+x spark-ec2/setup.sh")
ssh(master, opts, "spark-ec2/setup.sh")
@@ -528,7 +529,7 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes,
dest.write(text)
dest.close()
# rsync the whole directory over to the master machine
- command = (("rsync -rv -e 'ssh -o StrictHostKeyChecking=no -i %s' " +
+ command = (("rsync -rv -e 'ssh -o StrictHostKeyChecking=no -i %s' " +
"'%s/' '%s@%s:/'") % (opts.identity_file, tmp_dir, opts.user, active_master))
subprocess.check_call(command, shell=True)
# Remove the temp directory we created above
@@ -557,9 +558,9 @@ def ssh(host, opts, command):
print "Error connecting to host {0}, sleeping 30".format(e)
time.sleep(30)
tries = tries + 1
-
-
-
+
+
+
# Gets a list of zones to launch instances in
@@ -618,12 +619,12 @@ def main():
print "Terminating zoo..."
for inst in zoo_nodes:
inst.terminate()
-
+
# Delete security groups as well
if opts.delete_groups:
print "Deleting security groups (this will take some time)..."
group_names = [cluster_name + "-master", cluster_name + "-slaves", cluster_name + "-zoo"]
-
+
attempt = 1;
while attempt <= 3:
print "Attempt %d" % attempt
@@ -639,7 +640,7 @@ def main():
from_port=rule.from_port,
to_port=rule.to_port,
src_group=grant)
-
+
# Sleep for AWS eventual-consistency to catch up, and for instances
# to terminate
time.sleep(30) # Yes, it does have to be this long :-(
@@ -650,13 +651,13 @@ def main():
except boto.exception.EC2ResponseError:
success = False;
print "Failed to delete security group " + group.name
-
+
# Unfortunately, group.revoke() returns True even if a rule was not
# deleted, so this needs to be rerun if something fails
if success: break;
-
+
attempt += 1
-
+
if not success:
print "Failed to delete all security groups after 3 tries."
print "Try re-running in a few minutes."
@@ -679,7 +680,7 @@ def main():
elif action == "stop":
response = raw_input("Are you sure you want to stop the cluster " +
cluster_name + "?\nDATA ON EPHEMERAL DISKS WILL BE LOST, " +
- "BUT THE CLUSTER WILL KEEP USING SPACE ON\n" +
+ "BUT THE CLUSTER WILL KEEP USING SPACE ON\n" +
"AMAZON EBS IF IT IS EBS-BACKED!!\n" +
"Stop cluster " + cluster_name + " (y/N): ")
if response == "y":
diff --git a/make-distribution.sh b/make-distribution.sh
index 4374e0e8c4..0a8941c1f8 100755
--- a/make-distribution.sh
+++ b/make-distribution.sh
@@ -66,6 +66,7 @@ cp $FWDIR/repl/target/*.jar "$DISTDIR/jars/"
cp -r "$FWDIR/bin" "$DISTDIR"
cp -r "$FWDIR/conf" "$DISTDIR"
cp "$FWDIR/run" "$FWDIR/spark-shell" "$DISTDIR"
+cp "$FWDIR/spark-executor" "$DISTDIR"
if [ "$1" == "tgz" ]; then
diff --git a/mllib/pom.xml b/mllib/pom.xml
new file mode 100644
index 0000000000..f3928cc73d
--- /dev/null
+++ b/mllib/pom.xml
@@ -0,0 +1,165 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-parent</artifactId>
+ <version>0.8.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-mllib</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Project ML Library</name>
+ <url>http://spark-project.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jblas</groupId>
+ <artifactId>jblas</artifactId>
+ <version>1.2.3</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalacheck</groupId>
+ <artifactId>scalacheck_${scala.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>hadoop1</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-core</artifactId>
+ <version>${project.version}</version>
+ <classifier>hadoop1</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <classifier>hadoop1</classifier>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>hadoop2</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-core</artifactId>
+ <version>${project.version}</version>
+ <classifier>hadoop2</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <classifier>hadoop2</classifier>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>hadoop2-yarn</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-core</artifactId>
+ <version>${project.version}</version>
+ <classifier>hadoop2-yarn</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <classifier>hadoop2-yarn</classifier>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+</project>
diff --git a/mllib/src/main/scala/spark/mllib/classification/ClassificationModel.scala b/mllib/src/main/scala/spark/mllib/classification/ClassificationModel.scala
new file mode 100644
index 0000000000..d6154b66ae
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/classification/ClassificationModel.scala
@@ -0,0 +1,21 @@
+package spark.mllib.classification
+
+import spark.RDD
+
+trait ClassificationModel extends Serializable {
+ /**
+ * Predict values for the given data set using the model trained.
+ *
+ * @param testData RDD representing data points to be predicted
+ * @return RDD[Int] where each entry contains the corresponding prediction
+ */
+ def predict(testData: RDD[Array[Double]]): RDD[Int]
+
+ /**
+ * Predict values for a single data point using the model trained.
+ *
+ * @param testData array representing a single data point
+ * @return Int prediction from the trained model
+ */
+ def predict(testData: Array[Double]): Int
+}
diff --git a/mllib/src/main/scala/spark/mllib/regression/LogisticRegression.scala b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala
index 711e205c39..203aa8fdd4 100644
--- a/mllib/src/main/scala/spark/mllib/regression/LogisticRegression.scala
+++ b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala
@@ -15,12 +15,14 @@
* limitations under the License.
*/
-package spark.mllib.regression
+package spark.mllib.classification
import spark.{Logging, RDD, SparkContext}
import spark.mllib.optimization._
import spark.mllib.util.MLUtils
+import scala.math.round
+
import org.jblas.DoubleMatrix
/**
@@ -30,44 +32,30 @@ import org.jblas.DoubleMatrix
class LogisticRegressionModel(
val weights: Array[Double],
val intercept: Double,
- val stochasticLosses: Array[Double]) extends RegressionModel {
+ val stochasticLosses: Array[Double]) extends ClassificationModel {
// Create a column vector that can be used for predictions
private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*)
- override def predict(testData: spark.RDD[Array[Double]]) = {
+ override def predict(testData: spark.RDD[Array[Double]]): RDD[Int] = {
+ // A small optimization to avoid serializing the entire model. Only the weightsMatrix
+ // and intercept is needed.
+ val localWeights = weightsMatrix
+ val localIntercept = intercept
testData.map { x =>
- val margin = new DoubleMatrix(1, x.length, x:_*).mmul(weightsMatrix).get(0) + this.intercept
- 1.0/ (1.0 + math.exp(margin * -1))
+ val margin = new DoubleMatrix(1, x.length, x:_*).mmul(localWeights).get(0) + localIntercept
+ round(1.0/ (1.0 + math.exp(margin * -1))).toInt
}
}
- override def predict(testData: Array[Double]): Double = {
+ override def predict(testData: Array[Double]): Int = {
val dataMat = new DoubleMatrix(1, testData.length, testData:_*)
val margin = dataMat.mmul(weightsMatrix).get(0) + this.intercept
- 1.0/ (1.0 + math.exp(margin * -1))
- }
-}
-
-class LogisticGradient extends Gradient {
- override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix):
- (DoubleMatrix, Double) = {
- val margin: Double = -1.0 * data.dot(weights)
- val gradientMultiplier = (1.0 / (1.0 + math.exp(margin))) - label
-
- val gradient = data.mul(gradientMultiplier)
- val loss =
- if (margin > 0) {
- math.log(1 + math.exp(0 - margin))
- } else {
- math.log(1 + math.exp(margin)) - margin
- }
-
- (gradient, loss)
+ round(1.0/ (1.0 + math.exp(margin * -1))).toInt
}
}
-class LogisticRegression private (var stepSize: Double, var miniBatchFraction: Double,
+class LogisticRegressionLocalRandomSGD private (var stepSize: Double, var miniBatchFraction: Double,
var numIters: Int)
extends Logging {
@@ -100,19 +88,19 @@ class LogisticRegression private (var stepSize: Double, var miniBatchFraction: D
this
}
- def train(input: RDD[(Double, Array[Double])]): LogisticRegressionModel = {
+ def train(input: RDD[(Int, Array[Double])]): LogisticRegressionModel = {
val nfeatures: Int = input.take(1)(0)._2.length
val initialWeights = Array.fill(nfeatures)(1.0)
train(input, initialWeights)
}
def train(
- input: RDD[(Double, Array[Double])],
+ input: RDD[(Int, Array[Double])],
initialWeights: Array[Double]): LogisticRegressionModel = {
// Add a extra variable consisting of all 1.0's for the intercept.
val data = input.map { case (y, features) =>
- (y, Array(1.0, features:_*))
+ (y.toDouble, Array(1.0, features:_*))
}
val initalWeightsWithIntercept = Array(1.0, initialWeights:_*)
@@ -123,6 +111,7 @@ class LogisticRegression private (var stepSize: Double, var miniBatchFraction: D
new SimpleUpdater(),
stepSize,
numIters,
+ 0.0,
initalWeightsWithIntercept,
miniBatchFraction)
@@ -143,11 +132,11 @@ class LogisticRegression private (var stepSize: Double, var miniBatchFraction: D
* NOTE(shivaram): We use multiple train methods instead of default arguments to support
* Java programs.
*/
-object LogisticRegression {
+object LogisticRegressionLocalRandomSGD {
/**
- * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed number
- * of iterations of gradient descent using the specified step size. Each iteration uses
+ * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed
+ * number of iterations of gradient descent using the specified step size. Each iteration uses
* `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in
* gradient descent are initialized using the initial weights provided.
*
@@ -159,48 +148,51 @@ object LogisticRegression {
* the number of features in the data.
*/
def train(
- input: RDD[(Double, Array[Double])],
+ input: RDD[(Int, Array[Double])],
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double,
initialWeights: Array[Double])
: LogisticRegressionModel =
{
- new LogisticRegression(stepSize, miniBatchFraction, numIterations).train(input, initialWeights)
+ new LogisticRegressionLocalRandomSGD(stepSize, miniBatchFraction, numIterations).train(
+ input, initialWeights)
}
/**
- * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed number
- * of iterations of gradient descent using the specified step size. Each iteration uses
+ * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed
+ * number of iterations of gradient descent using the specified step size. Each iteration uses
* `miniBatchFraction` fraction of the data to calculate the gradient.
*
* @param input RDD of (label, array of features) pairs.
* @param numIterations Number of iterations of gradient descent to run.
* @param stepSize Step size to be used for each iteration of gradient descent.
+
* @param miniBatchFraction Fraction of data to be used per iteration.
*/
def train(
- input: RDD[(Double, Array[Double])],
+ input: RDD[(Int, Array[Double])],
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double)
: LogisticRegressionModel =
{
- new LogisticRegression(stepSize, miniBatchFraction, numIterations).train(input)
+ new LogisticRegressionLocalRandomSGD(stepSize, miniBatchFraction, numIterations).train(input)
}
/**
- * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed number
- * of iterations of gradient descent using the specified step size. We use the entire data set to update
- * the gradient in each iteration.
+ * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed
+ * number of iterations of gradient descent using the specified step size. We use the entire data
+ * set to update the gradient in each iteration.
*
* @param input RDD of (label, array of features) pairs.
* @param stepSize Step size to be used for each iteration of Gradient Descent.
+
* @param numIterations Number of iterations of gradient descent to run.
* @return a LogisticRegressionModel which has the weights and offset from training.
*/
def train(
- input: RDD[(Double, Array[Double])],
+ input: RDD[(Int, Array[Double])],
numIterations: Int,
stepSize: Double)
: LogisticRegressionModel =
@@ -209,16 +201,16 @@ object LogisticRegression {
}
/**
- * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed number
- * of iterations of gradient descent using a step size of 1.0. We use the entire data set to update
- * the gradient in each iteration.
+ * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed
+ * number of iterations of gradient descent using a step size of 1.0. We use the entire data set
+ * to update the gradient in each iteration.
*
* @param input RDD of (label, array of features) pairs.
* @param numIterations Number of iterations of gradient descent to run.
* @return a LogisticRegressionModel which has the weights and offset from training.
*/
def train(
- input: RDD[(Double, Array[Double])],
+ input: RDD[(Int, Array[Double])],
numIterations: Int)
: LogisticRegressionModel =
{
@@ -226,13 +218,15 @@ object LogisticRegression {
}
def main(args: Array[String]) {
- if (args.length != 4) {
- println("Usage: LogisticRegression <master> <input_dir> <step_size> <niters>")
+ if (args.length != 5) {
+ println("Usage: LogisticRegression <master> <input_dir> <step_size> " +
+ "<regularization_parameter> <niters>")
System.exit(1)
}
val sc = new SparkContext(args(0), "LogisticRegression")
- val data = MLUtils.loadLabeledData(sc, args(1))
- val model = LogisticRegression.train(data, args(3).toInt, args(2).toDouble)
+ val data = MLUtils.loadLabeledData(sc, args(1)).map(yx => (yx._1.toInt, yx._2))
+ val model = LogisticRegressionLocalRandomSGD.train(
+ data, args(4).toInt, args(2).toDouble, args(3).toDouble)
sc.stop()
}
diff --git a/mllib/src/main/scala/spark/mllib/classification/SVM.scala b/mllib/src/main/scala/spark/mllib/classification/SVM.scala
new file mode 100644
index 0000000000..3a6a12814a
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/classification/SVM.scala
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.classification
+
+import scala.math.signum
+import spark.{Logging, RDD, SparkContext}
+import spark.mllib.optimization._
+import spark.mllib.util.MLUtils
+
+import org.jblas.DoubleMatrix
+
+/**
+ * SVM using Stochastic Gradient Descent.
+ */
+class SVMModel(
+ val weights: Array[Double],
+ val intercept: Double,
+ val stochasticLosses: Array[Double]) extends ClassificationModel {
+
+ // Create a column vector that can be used for predictions
+ private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*)
+
+ override def predict(testData: spark.RDD[Array[Double]]): RDD[Int] = {
+ // A small optimization to avoid serializing the entire model. Only the weightsMatrix
+ // and intercept is needed.
+ val localWeights = weightsMatrix
+ val localIntercept = intercept
+ testData.map { x =>
+ signum(new DoubleMatrix(1, x.length, x:_*).dot(localWeights) + localIntercept).toInt
+ }
+ }
+
+ override def predict(testData: Array[Double]): Int = {
+ val dataMat = new DoubleMatrix(1, testData.length, testData:_*)
+ signum(dataMat.dot(weightsMatrix) + this.intercept).toInt
+ }
+}
+
+
+
+class SVMLocalRandomSGD private (var stepSize: Double, var regParam: Double,
+ var miniBatchFraction: Double, var numIters: Int)
+ extends Logging {
+
+ /**
+ * Construct a SVM object with default parameters
+ */
+ def this() = this(1.0, 1.0, 1.0, 100)
+
+ /**
+ * Set the step size per-iteration of SGD. Default 1.0.
+ */
+ def setStepSize(step: Double) = {
+ this.stepSize = step
+ this
+ }
+
+ /**
+ * Set the regularization parameter. Default 1.0.
+ */
+ def setRegParam(param: Double) = {
+ this.regParam = param
+ this
+ }
+
+ /**
+ * Set fraction of data to be used for each SGD iteration. Default 1.0.
+ */
+ def setMiniBatchFraction(fraction: Double) = {
+ this.miniBatchFraction = fraction
+ this
+ }
+
+ /**
+ * Set the number of iterations for SGD. Default 100.
+ */
+ def setNumIterations(iters: Int) = {
+ this.numIters = iters
+ this
+ }
+
+ def train(input: RDD[(Int, Array[Double])]): SVMModel = {
+ val nfeatures: Int = input.take(1)(0)._2.length
+ val initialWeights = Array.fill(nfeatures)(1.0)
+ train(input, initialWeights)
+ }
+
+ def train(
+ input: RDD[(Int, Array[Double])],
+ initialWeights: Array[Double]): SVMModel = {
+
+ // Add a extra variable consisting of all 1.0's for the intercept.
+ val data = input.map { case (y, features) =>
+ (y.toDouble, Array(1.0, features:_*))
+ }
+
+ val initalWeightsWithIntercept = Array(1.0, initialWeights:_*)
+
+ val (weights, stochasticLosses) = GradientDescent.runMiniBatchSGD(
+ data,
+ new HingeGradient(),
+ new SquaredL2Updater(),
+ stepSize,
+ numIters,
+ regParam,
+ initalWeightsWithIntercept,
+ miniBatchFraction)
+
+ val intercept = weights(0)
+ val weightsScaled = weights.tail
+
+ val model = new SVMModel(weightsScaled, intercept, stochasticLosses)
+
+ logInfo("Final model weights " + model.weights.mkString(","))
+ logInfo("Final model intercept " + model.intercept)
+ logInfo("Last 10 stochasticLosses " + model.stochasticLosses.takeRight(10).mkString(", "))
+ model
+ }
+}
+
+/**
+ * Top-level methods for calling SVM.
+
+
+ */
+object SVMLocalRandomSGD {
+
+ /**
+ * Train a SVM model given an RDD of (label, features) pairs. We run a fixed number
+ * of iterations of gradient descent using the specified step size. Each iteration uses
+ * `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in
+ * gradient descent are initialized using the initial weights provided.
+ *
+ * @param input RDD of (label, array of features) pairs.
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @param stepSize Step size to be used for each iteration of gradient descent.
+ * @param regParam Regularization parameter.
+ * @param miniBatchFraction Fraction of data to be used per iteration.
+ * @param initialWeights Initial set of weights to be used. Array should be equal in size to
+ * the number of features in the data.
+ */
+ def train(
+ input: RDD[(Int, Array[Double])],
+ numIterations: Int,
+ stepSize: Double,
+ regParam: Double,
+ miniBatchFraction: Double,
+ initialWeights: Array[Double])
+ : SVMModel =
+ {
+ new SVMLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(
+ input, initialWeights)
+ }
+
+ /**
+ * Train a SVM model given an RDD of (label, features) pairs. We run a fixed number
+ * of iterations of gradient descent using the specified step size. Each iteration uses
+ * `miniBatchFraction` fraction of the data to calculate the gradient.
+ *
+ * @param input RDD of (label, array of features) pairs.
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @param stepSize Step size to be used for each iteration of gradient descent.
+ * @param regParam Regularization parameter.
+ * @param miniBatchFraction Fraction of data to be used per iteration.
+ */
+ def train(
+ input: RDD[(Int, Array[Double])],
+ numIterations: Int,
+ stepSize: Double,
+ regParam: Double,
+ miniBatchFraction: Double)
+ : SVMModel =
+ {
+ new SVMLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(input)
+ }
+
+ /**
+ * Train a SVM model given an RDD of (label, features) pairs. We run a fixed number
+ * of iterations of gradient descent using the specified step size. We use the entire data set to
+ * update the gradient in each iteration.
+ *
+ * @param input RDD of (label, array of features) pairs.
+ * @param stepSize Step size to be used for each iteration of Gradient Descent.
+ * @param regParam Regularization parameter.
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @return a SVMModel which has the weights and offset from training.
+ */
+ def train(
+ input: RDD[(Int, Array[Double])],
+ numIterations: Int,
+ stepSize: Double,
+ regParam: Double)
+ : SVMModel =
+ {
+ train(input, numIterations, stepSize, regParam, 1.0)
+ }
+
+ /**
+ * Train a SVM model given an RDD of (label, features) pairs. We run a fixed number
+ * of iterations of gradient descent using a step size of 1.0. We use the entire data set to
+ * update the gradient in each iteration.
+ *
+ * @param input RDD of (label, array of features) pairs.
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @return a SVMModel which has the weights and offset from training.
+ */
+ def train(
+ input: RDD[(Int, Array[Double])],
+ numIterations: Int)
+ : SVMModel =
+ {
+ train(input, numIterations, 1.0, 1.0, 1.0)
+ }
+
+ def main(args: Array[String]) {
+ if (args.length != 5) {
+ println("Usage: SVM <master> <input_dir> <step_size> <regularization_parameter> <niters>")
+ System.exit(1)
+ }
+ val sc = new SparkContext(args(0), "SVM")
+ val data = MLUtils.loadLabeledData(sc, args(1)).map(yx => (yx._1.toInt, yx._2))
+ val model = SVMLocalRandomSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble)
+
+ sc.stop()
+ }
+}
diff --git a/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala
index d5338360c8..22b2ec5ed6 100644
--- a/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala
+++ b/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala
@@ -30,3 +30,48 @@ abstract class Gradient extends Serializable {
def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix):
(DoubleMatrix, Double)
}
+
+class LogisticGradient extends Gradient {
+ override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix):
+ (DoubleMatrix, Double) = {
+ val margin: Double = -1.0 * data.dot(weights)
+ val gradientMultiplier = (1.0 / (1.0 + math.exp(margin))) - label
+
+ val gradient = data.mul(gradientMultiplier)
+ val loss =
+ if (margin > 0) {
+ math.log(1 + math.exp(0 - margin))
+ } else {
+ math.log(1 + math.exp(margin)) - margin
+ }
+
+ (gradient, loss)
+ }
+}
+
+
+class SquaredGradient extends Gradient {
+ override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix):
+ (DoubleMatrix, Double) = {
+ val diff: Double = data.dot(weights) - label
+
+ val loss = 0.5 * diff * diff
+ val gradient = data.mul(diff)
+
+ (gradient, loss)
+ }
+}
+
+
+class HingeGradient extends Gradient {
+ override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix):
+ (DoubleMatrix, Double) = {
+
+ val dotProduct = data.dot(weights)
+
+ if (1.0 > label * dotProduct)
+ (data.mul(-label), 1.0 - label * dotProduct)
+ else
+ (DoubleMatrix.zeros(1,weights.length), 0.0)
+ }
+}
diff --git a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala
index 4c996c0903..19cda26446 100644
--- a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala
+++ b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala
@@ -36,12 +36,13 @@ object GradientDescent {
* @param updater - Updater object that will be used to update the model.
* @param stepSize - stepSize to be used during update.
* @param numIters - number of iterations that SGD should be run.
+ * @param regParam - regularization parameter
* @param miniBatchFraction - fraction of the input data set that should be used for
* one iteration of SGD. Default value 1.0.
*
- * @return weights - Column matrix containing weights for every feature.
- * @return stochasticLossHistory - Array containing the stochastic loss computed for
- * every iteration.
+ * @return A tuple containing two elements. The first element is a column matrix containing
+ * weights for every feature, and the second element is an array containing the stochastic
+ * loss computed for every iteration.
*/
def runMiniBatchSGD(
data: RDD[(Double, Array[Double])],
@@ -49,6 +50,7 @@ object GradientDescent {
updater: Updater,
stepSize: Double,
numIters: Int,
+ regParam: Double,
initialWeights: Array[Double],
miniBatchFraction: Double=1.0) : (Array[Double], Array[Double]) = {
@@ -59,7 +61,7 @@ object GradientDescent {
// Initialize weights as a column vector
var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights:_*)
- var reg_val = 0.0
+ var regVal = 0.0
for (i <- 1 to numIters) {
val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42+i).map {
@@ -69,10 +71,14 @@ object GradientDescent {
(grad, loss)
}.reduce((a, b) => (a._1.addi(b._1), a._2 + b._2))
- stochasticLossHistory.append(lossSum / miniBatchSize + reg_val)
- val update = updater.compute(weights, gradientSum.div(miniBatchSize), stepSize, i)
+ /**
+ * NOTE(Xinghao): lossSum is computed using the weights from the previous iteration
+ * and regVal is the regularization value computed in the previous iteration as well.
+ */
+ stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
+ val update = updater.compute(weights, gradientSum.div(miniBatchSize), stepSize, i, regParam)
weights = update._1
- reg_val = update._2
+ regVal = update._2
}
(weights.toArray, stochasticLossHistory.toArray)
diff --git a/mllib/src/main/scala/spark/mllib/optimization/Updater.scala b/mllib/src/main/scala/spark/mllib/optimization/Updater.scala
index b864fd4634..3ebc1409b6 100644
--- a/mllib/src/main/scala/spark/mllib/optimization/Updater.scala
+++ b/mllib/src/main/scala/spark/mllib/optimization/Updater.scala
@@ -17,28 +17,68 @@
package spark.mllib.optimization
+import scala.math._
import org.jblas.DoubleMatrix
abstract class Updater extends Serializable {
/**
* Compute an updated value for weights given the gradient, stepSize and iteration number.
+ * Also returns the regularization value computed using the *updated* weights.
*
* @param weightsOld - Column matrix of size nx1 where n is the number of features.
* @param gradient - Column matrix of size nx1 where n is the number of features.
* @param stepSize - step size across iterations
* @param iter - Iteration number
+ * @param regParam - Regularization parameter
*
- * @return weightsNew - Column matrix containing updated weights
- * @return reg_val - regularization value
+ * @return A tuple of 2 elements. The first element is a column matrix containing updated weights,
+ * and the second element is the regularization value computed using updated weights.
*/
- def compute(weightsOlds: DoubleMatrix, gradient: DoubleMatrix, stepSize: Double, iter: Int):
- (DoubleMatrix, Double)
+ def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix, stepSize: Double, iter: Int,
+ regParam: Double): (DoubleMatrix, Double)
}
class SimpleUpdater extends Updater {
override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix,
- stepSize: Double, iter: Int): (DoubleMatrix, Double) = {
- val normGradient = gradient.mul(stepSize / math.sqrt(iter))
+ stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) = {
+ val thisIterStepSize = stepSize / math.sqrt(iter)
+ val normGradient = gradient.mul(thisIterStepSize)
(weightsOld.sub(normGradient), 0)
}
}
+
+/**
+ * L1 regularization -- corresponding proximal operator is the soft-thresholding function
+ * That is, each weight component is shrunk towards 0 by shrinkageVal
+ * If w > shrinkageVal, set weight component to w-shrinkageVal.
+ * If w < -shrinkageVal, set weight component to w+shrinkageVal.
+ * If -shrinkageVal < w < shrinkageVal, set weight component to 0.
+ * Equivalently, set weight component to signum(w) * max(0.0, abs(w) - shrinkageVal)
+ */
+class L1Updater extends Updater {
+ override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix,
+ stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) = {
+ val thisIterStepSize = stepSize / math.sqrt(iter)
+ val normGradient = gradient.mul(thisIterStepSize)
+ // Take gradient step
+ val newWeights = weightsOld.sub(normGradient)
+ // Soft thresholding
+ val shrinkageVal = regParam * thisIterStepSize
+ (0 until newWeights.length).foreach { i =>
+ val wi = newWeights.get(i)
+ newWeights.put(i, signum(wi) * max(0.0, abs(wi) - shrinkageVal))
+ }
+ (newWeights, newWeights.norm1 * regParam)
+ }
+}
+
+class SquaredL2Updater extends Updater {
+ override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix,
+ stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) = {
+ val thisIterStepSize = stepSize / math.sqrt(iter)
+ val normGradient = gradient.mul(thisIterStepSize)
+ val newWeights = weightsOld.sub(normGradient).div(2.0 * thisIterStepSize * regParam + 1.0)
+ (newWeights, pow(newWeights.norm2, 2.0) * regParam)
+ }
+}
+
diff --git a/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala
index 7da96397a6..7281b2fcb9 100644
--- a/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala
@@ -35,8 +35,7 @@ import org.jblas.{DoubleMatrix, SimpleBlas, Solve}
* of the elements within this block, and the list of destination blocks that each user or
* product will need to send its feature vector to.
*/
-private[recommendation] case class OutLinkBlock(
- elementIds: Array[Int], shouldSend: Array[BitSet])
+private[recommendation] case class OutLinkBlock(elementIds: Array[Int], shouldSend: Array[BitSet])
/**
@@ -105,7 +104,7 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
}
/**
- * Run ALS with the configured parmeters on an input RDD of (user, product, rating) triples.
+ * Run ALS with the configured parameters on an input RDD of (user, product, rating) triples.
* Returns a MatrixFactorizationModel with feature vectors for each user and product.
*/
def train(ratings: RDD[(Int, Int, Double)]): MatrixFactorizationModel = {
diff --git a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala
new file mode 100644
index 0000000000..e8b1ed8a48
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.regression
+
+import spark.{Logging, RDD, SparkContext}
+import spark.mllib.optimization._
+import spark.mllib.util.MLUtils
+
+import org.jblas.DoubleMatrix
+
+/**
+ * Lasso using Stochastic Gradient Descent.
+ *
+ */
+class LassoModel(
+ val weights: Array[Double],
+ val intercept: Double,
+ val stochasticLosses: Array[Double]) extends RegressionModel {
+
+ // Create a column vector that can be used for predictions
+ private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*)
+
+ override def predict(testData: spark.RDD[Array[Double]]) = {
+ // A small optimization to avoid serializing the entire model. Only the weightsMatrix
+ // and intercept is needed.
+ val localWeights = weightsMatrix
+ val localIntercept = intercept
+ testData.map { x =>
+ new DoubleMatrix(1, x.length, x:_*).dot(localWeights) + localIntercept
+ }
+ }
+
+
+ override def predict(testData: Array[Double]): Double = {
+ val dataMat = new DoubleMatrix(1, testData.length, testData:_*)
+ dataMat.dot(weightsMatrix) + this.intercept
+ }
+}
+
+
+class LassoLocalRandomSGD private (var stepSize: Double, var regParam: Double,
+ var miniBatchFraction: Double, var numIters: Int)
+ extends Logging {
+
+ /**
+ * Construct a Lasso object with default parameters
+ */
+ def this() = this(1.0, 1.0, 1.0, 100)
+
+ /**
+ * Set the step size per-iteration of SGD. Default 1.0.
+ */
+ def setStepSize(step: Double) = {
+ this.stepSize = step
+ this
+ }
+
+ /**
+ * Set the regularization parameter. Default 1.0.
+ */
+ def setRegParam(param: Double) = {
+ this.regParam = param
+ this
+ }
+
+ /**
+ * Set fraction of data to be used for each SGD iteration. Default 1.0.
+ */
+ def setMiniBatchFraction(fraction: Double) = {
+ this.miniBatchFraction = fraction
+ this
+ }
+
+ /**
+ * Set the number of iterations for SGD. Default 100.
+ */
+ def setNumIterations(iters: Int) = {
+ this.numIters = iters
+ this
+ }
+
+ def train(input: RDD[(Double, Array[Double])]): LassoModel = {
+ val nfeatures: Int = input.take(1)(0)._2.length
+ val initialWeights = Array.fill(nfeatures)(1.0)
+ train(input, initialWeights)
+ }
+
+ def train(
+ input: RDD[(Double, Array[Double])],
+ initialWeights: Array[Double]): LassoModel = {
+
+ // Add a extra variable consisting of all 1.0's for the intercept.
+ val data = input.map { case (y, features) =>
+ (y, Array(1.0, features:_*))
+ }
+
+ val initalWeightsWithIntercept = Array(1.0, initialWeights:_*)
+
+ val (weights, stochasticLosses) = GradientDescent.runMiniBatchSGD(
+ data,
+ new SquaredGradient(),
+ new L1Updater(),
+ stepSize,
+ numIters,
+ regParam,
+ initalWeightsWithIntercept,
+ miniBatchFraction)
+
+ val intercept = weights(0)
+ val weightsScaled = weights.tail
+
+ val model = new LassoModel(weightsScaled, intercept, stochasticLosses)
+
+ logInfo("Final model weights " + model.weights.mkString(","))
+ logInfo("Final model intercept " + model.intercept)
+ logInfo("Last 10 stochasticLosses " + model.stochasticLosses.takeRight(10).mkString(", "))
+ model
+ }
+}
+
+/**
+ * Top-level methods for calling Lasso.
+ *
+ *
+ */
+object LassoLocalRandomSGD {
+
+ /**
+ * Train a Lasso model given an RDD of (label, features) pairs. We run a fixed number
+ * of iterations of gradient descent using the specified step size. Each iteration uses
+ * `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in
+ * gradient descent are initialized using the initial weights provided.
+ *
+ * @param input RDD of (label, array of features) pairs.
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @param stepSize Step size to be used for each iteration of gradient descent.
+ * @param regParam Regularization parameter.
+ * @param miniBatchFraction Fraction of data to be used per iteration.
+ * @param initialWeights Initial set of weights to be used. Array should be equal in size to
+ * the number of features in the data.
+ */
+ def train(
+ input: RDD[(Double, Array[Double])],
+ numIterations: Int,
+ stepSize: Double,
+ regParam: Double,
+ miniBatchFraction: Double,
+ initialWeights: Array[Double])
+ : LassoModel =
+ {
+ new LassoLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(
+ input, initialWeights)
+ }
+
+ /**
+ * Train a Lasso model given an RDD of (label, features) pairs. We run a fixed number
+ * of iterations of gradient descent using the specified step size. Each iteration uses
+ * `miniBatchFraction` fraction of the data to calculate the gradient.
+ *
+ * @param input RDD of (label, array of features) pairs.
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @param stepSize Step size to be used for each iteration of gradient descent.
+ * @param regParam Regularization parameter.
+ * @param miniBatchFraction Fraction of data to be used per iteration.
+ */
+ def train(
+ input: RDD[(Double, Array[Double])],
+ numIterations: Int,
+ stepSize: Double,
+ regParam: Double,
+ miniBatchFraction: Double)
+ : LassoModel =
+ {
+ new LassoLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(input)
+ }
+
+ /**
+ * Train a Lasso model given an RDD of (label, features) pairs. We run a fixed number
+ * of iterations of gradient descent using the specified step size. We use the entire data set to
+ * update the gradient in each iteration.
+ *
+ * @param input RDD of (label, array of features) pairs.
+ * @param stepSize Step size to be used for each iteration of Gradient Descent.
+ * @param regParam Regularization parameter.
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @return a LassoModel which has the weights and offset from training.
+ */
+ def train(
+ input: RDD[(Double, Array[Double])],
+ numIterations: Int,
+ stepSize: Double,
+ regParam: Double)
+ : LassoModel =
+ {
+ train(input, numIterations, stepSize, regParam, 1.0)
+ }
+
+ /**
+ * Train a Lasso model given an RDD of (label, features) pairs. We run a fixed number
+ * of iterations of gradient descent using a step size of 1.0. We use the entire data set to
+ * update the gradient in each iteration.
+ *
+ * @param input RDD of (label, array of features) pairs.
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @return a LassoModel which has the weights and offset from training.
+ */
+ def train(
+ input: RDD[(Double, Array[Double])],
+ numIterations: Int)
+ : LassoModel =
+ {
+ train(input, numIterations, 1.0, 1.0, 1.0)
+ }
+
+ def main(args: Array[String]) {
+ if (args.length != 5) {
+ println("Usage: Lasso <master> <input_dir> <step_size> <regularization_parameter> <niters>")
+ System.exit(1)
+ }
+ val sc = new SparkContext(args(0), "Lasso")
+ val data = MLUtils.loadLabeledData(sc, args(1))
+ val model = LassoLocalRandomSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble)
+
+ sc.stop()
+ }
+}
diff --git a/mllib/src/main/scala/spark/mllib/regression/Regression.scala b/mllib/src/main/scala/spark/mllib/regression/RegressionModel.scala
index 645204ddf3..b845ba1a89 100644
--- a/mllib/src/main/scala/spark/mllib/regression/Regression.scala
+++ b/mllib/src/main/scala/spark/mllib/regression/RegressionModel.scala
@@ -19,7 +19,7 @@ package spark.mllib.regression
import spark.RDD
-trait RegressionModel {
+trait RegressionModel extends Serializable {
/**
* Predict values for the given data set using the model trained.
*
diff --git a/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala b/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala
index f724edd732..6ba141e8fb 100644
--- a/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala
+++ b/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala
@@ -37,8 +37,11 @@ class RidgeRegressionModel(
extends RegressionModel {
override def predict(testData: RDD[Array[Double]]): RDD[Double] = {
+ // A small optimization to avoid serializing the entire model.
+ val localIntercept = this.intercept
+ val localWeights = this.weights
testData.map { x =>
- (new DoubleMatrix(1, x.length, x:_*).mmul(this.weights)).get(0) + this.intercept
+ (new DoubleMatrix(1, x.length, x:_*).mmul(localWeights)).get(0) + localIntercept
}
}
diff --git a/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala
new file mode 100644
index 0000000000..ef4f42a494
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala
@@ -0,0 +1,45 @@
+package spark.mllib.regression
+
+import scala.util.Random
+
+import org.jblas.DoubleMatrix
+
+import spark.{RDD, SparkContext}
+import spark.mllib.util.MLUtils
+
+object LassoGenerator {
+
+ def main(args: Array[String]) {
+ if (args.length != 5) {
+ println("Usage: LassoGenerator " +
+ "<master> <output_dir> <num_examples> <num_features> <num_partitions>")
+ System.exit(1)
+ }
+
+ val sparkMaster: String = args(0)
+ val outputPath: String = args(1)
+ val nexamples: Int = if (args.length > 2) args(2).toInt else 1000
+ val nfeatures: Int = if (args.length > 3) args(3).toInt else 2
+ val parts: Int = if (args.length > 4) args(4).toInt else 2
+ val eps = 3
+
+ val sc = new SparkContext(sparkMaster, "LassoGenerator")
+
+ val globalRnd = new Random(94720)
+ val trueWeights = new DoubleMatrix(1, nfeatures+1,
+ Array.fill[Double](nfeatures + 1) { globalRnd.nextGaussian() }:_*)
+
+ val data: RDD[(Double, Array[Double])] = sc.parallelize(0 until nexamples, parts).map { idx =>
+ val rnd = new Random(42 + idx)
+
+ val x = Array.fill[Double](nfeatures) {
+ rnd.nextDouble() * 2.0 - 1.0
+ }
+ val y = (new DoubleMatrix(1, x.length, x:_*)).dot(trueWeights) + rnd.nextGaussian() * 0.1
+ (y, x)
+ }
+
+ MLUtils.saveLabeledData(data, outputPath)
+ sc.stop()
+ }
+}
diff --git a/mllib/src/main/scala/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/spark/mllib/util/MLUtils.scala
index b5e564df6d..25d9673004 100644
--- a/mllib/src/main/scala/spark/mllib/util/MLUtils.scala
+++ b/mllib/src/main/scala/spark/mllib/util/MLUtils.scala
@@ -38,9 +38,9 @@ object MLUtils {
*/
def loadLabeledData(sc: SparkContext, dir: String): RDD[(Double, Array[Double])] = {
sc.textFile(dir).map { line =>
- val parts = line.split(",")
+ val parts = line.split(',')
val label = parts(0).toDouble
- val features = parts(1).trim().split(" ").map(_.toDouble)
+ val features = parts(1).trim().split(' ').map(_.toDouble)
(label, features)
}
}
diff --git a/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala
new file mode 100644
index 0000000000..00a54d9a70
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala
@@ -0,0 +1,48 @@
+package spark.mllib.classification
+
+import scala.util.Random
+import scala.math.signum
+
+import org.jblas.DoubleMatrix
+
+import spark.{RDD, SparkContext}
+import spark.mllib.util.MLUtils
+
+import org.jblas.DoubleMatrix
+
+object SVMGenerator {
+
+ def main(args: Array[String]) {
+ if (args.length != 5) {
+ println("Usage: SVMGenerator " +
+ "<master> <output_dir> <num_examples> <num_features> <num_partitions>")
+ System.exit(1)
+ }
+
+ val sparkMaster: String = args(0)
+ val outputPath: String = args(1)
+ val nexamples: Int = if (args.length > 2) args(2).toInt else 1000
+ val nfeatures: Int = if (args.length > 3) args(3).toInt else 2
+ val parts: Int = if (args.length > 4) args(4).toInt else 2
+ val eps = 3
+
+ val sc = new SparkContext(sparkMaster, "SVMGenerator")
+
+ val globalRnd = new Random(94720)
+ val trueWeights = new DoubleMatrix(1, nfeatures+1,
+ Array.fill[Double](nfeatures + 1) { globalRnd.nextGaussian() }:_*)
+
+ val data: RDD[(Double, Array[Double])] = sc.parallelize(0 until nexamples, parts).map { idx =>
+ val rnd = new Random(42 + idx)
+
+ val x = Array.fill[Double](nfeatures) {
+ rnd.nextDouble() * 2.0 - 1.0
+ }
+ val y = signum((new DoubleMatrix(1, x.length, x:_*)).dot(trueWeights) + rnd.nextGaussian() * 0.1)
+ (y, x)
+ }
+
+ MLUtils.saveLabeledData(data, outputPath)
+ sc.stop()
+ }
+}
diff --git a/mllib/src/test/scala/spark/mllib/regression/LogisticRegressionSuite.scala b/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala
index 47191d9a5a..8664263935 100644
--- a/mllib/src/test/scala/spark/mllib/regression/LogisticRegressionSuite.scala
+++ b/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala
@@ -15,18 +15,18 @@
* limitations under the License.
*/
-package spark.mllib.regression
+package spark.mllib.classification
import scala.util.Random
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite
+import org.scalatest.matchers.ShouldMatchers
import spark.SparkContext
-import spark.SparkContext._
-class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll {
+class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with ShouldMatchers {
val sc = new SparkContext("local", "test")
override def afterAll() {
@@ -36,10 +36,11 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll {
// Generate input of the form Y = logistic(offset + scale*X)
def generateLogisticInput(
- offset: Double,
- scale: Double,
- nPoints: Int) : Seq[(Double, Array[Double])] = {
- val rnd = new Random(42)
+ offset: Double,
+ scale: Double,
+ nPoints: Int,
+ seed: Int): Seq[(Int, Array[Double])] = {
+ val rnd = new Random(seed)
val x1 = Array.fill[Double](nPoints)(rnd.nextGaussian())
// NOTE: if U is uniform[0, 1] then ln(u) - ln(1-u) is Logistic(0,1)
@@ -51,33 +52,49 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll {
// y <- A + B*x + rLogis()
// y <- as.numeric(y > 0)
- val y = (0 until nPoints).map { i =>
+ val y: Seq[Int] = (0 until nPoints).map { i =>
val yVal = offset + scale * x1(i) + rLogis(i)
- if (yVal > 0) 1.0 else 0.0
+ if (yVal > 0) 1 else 0
}
- val testData = (0 until nPoints).map(i => (y(i).toDouble, Array(x1(i))))
+ val testData = (0 until nPoints).map(i => (y(i), Array(x1(i))))
testData
}
+ def validatePrediction(predictions: Seq[Int], input: Seq[(Int, Array[Double])]) {
+ val numOffPredictions = predictions.zip(input).filter { case (prediction, (expected, _)) =>
+ (prediction != expected)
+ }.size
+ // At least 83% of the predictions should be on.
+ ((input.length - numOffPredictions).toDouble / input.length) should be > 0.83
+ }
+
// Test if we can correctly learn A, B where Y = logistic(A + B*X)
test("logistic regression") {
val nPoints = 10000
val A = 2.0
val B = -1.5
- val testData = generateLogisticInput(A, B, nPoints)
+ val testData = generateLogisticInput(A, B, nPoints, 42)
val testRDD = sc.parallelize(testData, 2)
testRDD.cache()
- val lr = new LogisticRegression().setStepSize(10.0)
- .setNumIterations(20)
+ val lr = new LogisticRegressionLocalRandomSGD().setStepSize(10.0).setNumIterations(20)
val model = lr.train(testRDD)
+ // Test the weights
val weight0 = model.weights(0)
assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]")
assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]")
+
+ val validationData = generateLogisticInput(A, B, nPoints, 17)
+ val validationRDD = sc.parallelize(validationData, 2)
+ // Test prediction on RDD.
+ validatePrediction(model.predict(validationRDD.map(_._2)).collect(), validationData)
+
+ // Test prediction on Array.
+ validatePrediction(validationData.map(row => model.predict(row._2)), validationData)
}
test("logistic regression with initial weights") {
@@ -85,7 +102,7 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll {
val A = 2.0
val B = -1.5
- val testData = generateLogisticInput(A, B, nPoints)
+ val testData = generateLogisticInput(A, B, nPoints, 42)
val initialB = -1.0
val initialWeights = Array(initialB)
@@ -94,13 +111,20 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll {
testRDD.cache()
// Use half as many iterations as the previous test.
- val lr = new LogisticRegression().setStepSize(10.0)
- .setNumIterations(10)
+ val lr = new LogisticRegressionLocalRandomSGD().setStepSize(10.0).setNumIterations(10)
val model = lr.train(testRDD, initialWeights)
val weight0 = model.weights(0)
assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]")
assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]")
+
+ val validationData = generateLogisticInput(A, B, nPoints, 17)
+ val validationRDD = sc.parallelize(validationData, 2)
+ // Test prediction on RDD.
+ validatePrediction(model.predict(validationRDD.map(_._2)).collect(), validationData)
+
+ // Test prediction on Array.
+ validatePrediction(validationData.map(row => model.predict(row._2)), validationData)
}
}
diff --git a/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala b/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala
new file mode 100644
index 0000000000..d546e0729e
--- /dev/null
+++ b/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.classification
+
+import scala.util.Random
+import scala.math.signum
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.FunSuite
+
+import spark.SparkContext
+
+import org.jblas.DoubleMatrix
+
+class SVMSuite extends FunSuite with BeforeAndAfterAll {
+ val sc = new SparkContext("local", "test")
+
+ override def afterAll() {
+ sc.stop()
+ System.clearProperty("spark.driver.port")
+ }
+
+ // Generate noisy input of the form Y = signum(x.dot(weights) + intercept + noise)
+ def generateSVMInput(
+ intercept: Double,
+ weights: Array[Double],
+ nPoints: Int,
+ seed: Int): Seq[(Int, Array[Double])] = {
+ val rnd = new Random(seed)
+ val weightsMat = new DoubleMatrix(1, weights.length, weights:_*)
+ val x = Array.fill[Array[Double]](nPoints)(Array.fill[Double](weights.length)(rnd.nextGaussian()))
+ val y = x.map(xi =>
+ signum((new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) + intercept + 0.1 * rnd.nextGaussian()).toInt
+ )
+ y zip x
+ }
+
+ def validatePrediction(predictions: Seq[Int], input: Seq[(Int, Array[Double])]) {
+ val numOffPredictions = predictions.zip(input).filter { case (prediction, (expected, _)) =>
+ (prediction != expected)
+ }.size
+ // At least 80% of the predictions should be on.
+ assert(numOffPredictions < input.length / 5)
+ }
+
+ test("SVMLocalRandomSGD") {
+ val nPoints = 10000
+
+ val A = 2.0
+ val B = -1.5
+ val C = 1.0
+
+ val testData = generateSVMInput(A, Array[Double](B,C), nPoints, 42)
+
+ val testRDD = sc.parallelize(testData, 2)
+ testRDD.cache()
+
+ val svm = new SVMLocalRandomSGD().setStepSize(1.0).setRegParam(1.0).setNumIterations(100)
+
+ val model = svm.train(testRDD)
+
+ val validationData = generateSVMInput(A, Array[Double](B,C), nPoints, 17)
+ val validationRDD = sc.parallelize(validationData,2)
+
+ // Test prediction on RDD.
+ validatePrediction(model.predict(validationRDD.map(_._2)).collect(), validationData)
+
+ // Test prediction on Array.
+ validatePrediction(validationData.map(row => model.predict(row._2)), validationData)
+ }
+
+ test("SVMLocalRandomSGD with initial weights") {
+ val nPoints = 10000
+
+ val A = 2.0
+ val B = -1.5
+ val C = 1.0
+
+ val testData = generateSVMInput(A, Array[Double](B,C), nPoints, 42)
+
+ val initialB = -1.0
+ val initialC = -1.0
+ val initialWeights = Array(initialB,initialC)
+
+ val testRDD = sc.parallelize(testData, 2)
+ testRDD.cache()
+
+ val svm = new SVMLocalRandomSGD().setStepSize(1.0).setRegParam(1.0).setNumIterations(100)
+
+ val model = svm.train(testRDD, initialWeights)
+
+ val validationData = generateSVMInput(A, Array[Double](B,C), nPoints, 17)
+ val validationRDD = sc.parallelize(validationData,2)
+
+ // Test prediction on RDD.
+ validatePrediction(model.predict(validationRDD.map(_._2)).collect(), validationData)
+
+ // Test prediction on Array.
+ validatePrediction(validationData.map(row => model.predict(row._2)), validationData)
+ }
+}
diff --git a/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala b/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala
new file mode 100644
index 0000000000..cf2b067d40
--- /dev/null
+++ b/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.regression
+
+import scala.util.Random
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.FunSuite
+
+import spark.SparkContext
+
+import org.jblas.DoubleMatrix
+
+
+class LassoSuite extends FunSuite with BeforeAndAfterAll {
+ val sc = new SparkContext("local", "test")
+
+ override def afterAll() {
+ sc.stop()
+ System.clearProperty("spark.driver.port")
+ }
+
+ // Generate noisy input of the form Y = x.dot(weights) + intercept + noise
+ def generateLassoInput(
+ intercept: Double,
+ weights: Array[Double],
+ nPoints: Int,
+ seed: Int): Seq[(Double, Array[Double])] = {
+ val rnd = new Random(seed)
+ val weightsMat = new DoubleMatrix(1, weights.length, weights:_*)
+ val x = Array.fill[Array[Double]](nPoints)(Array.fill[Double](weights.length)(rnd.nextGaussian()))
+ val y = x.map(xi =>
+ (new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) + intercept + 0.1 * rnd.nextGaussian()
+ )
+ y zip x
+ }
+
+ def validatePrediction(predictions: Seq[Double], input: Seq[(Double, Array[Double])]) {
+ val numOffPredictions = predictions.zip(input).filter { case (prediction, (expected, _)) =>
+ // A prediction is off if the prediction is more than 0.5 away from expected value.
+ math.abs(prediction - expected) > 0.5
+ }.size
+ // At least 80% of the predictions should be on.
+ assert(numOffPredictions < input.length / 5)
+ }
+
+ test("LassoLocalRandomSGD") {
+ val nPoints = 10000
+
+ val A = 2.0
+ val B = -1.5
+ val C = 1.0e-2
+
+ val testData = generateLassoInput(A, Array[Double](B,C), nPoints, 42)
+
+ val testRDD = sc.parallelize(testData, 2)
+ testRDD.cache()
+ val ls = new LassoLocalRandomSGD().setStepSize(1.0).setRegParam(0.01).setNumIterations(20)
+
+ val model = ls.train(testRDD)
+
+ val weight0 = model.weights(0)
+ val weight1 = model.weights(1)
+ assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]")
+ assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]")
+ assert(weight1 >= -1.0e-3 && weight1 <= 1.0e-3, weight1 + " not in [-0.001, 0.001]")
+
+ val validationData = generateLassoInput(A, Array[Double](B,C), nPoints, 17)
+ val validationRDD = sc.parallelize(validationData,2)
+
+ // Test prediction on RDD.
+ validatePrediction(model.predict(validationRDD.map(_._2)).collect(), validationData)
+
+ // Test prediction on Array.
+ validatePrediction(validationData.map(row => model.predict(row._2)), validationData)
+ }
+
+ test("LassoLocalRandomSGD with initial weights") {
+ val nPoints = 10000
+
+ val A = 2.0
+ val B = -1.5
+ val C = 1.0e-2
+
+ val testData = generateLassoInput(A, Array[Double](B,C), nPoints, 42)
+
+ val initialB = -1.0
+ val initialC = -1.0
+ val initialWeights = Array(initialB,initialC)
+
+ val testRDD = sc.parallelize(testData, 2)
+ testRDD.cache()
+ val ls = new LassoLocalRandomSGD().setStepSize(1.0).setRegParam(0.01).setNumIterations(20)
+
+ val model = ls.train(testRDD, initialWeights)
+
+ val weight0 = model.weights(0)
+ val weight1 = model.weights(1)
+ assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]")
+ assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]")
+ assert(weight1 >= -1.0e-3 && weight1 <= 1.0e-3, weight1 + " not in [-0.001, 0.001]")
+
+ val validationData = generateLassoInput(A, Array[Double](B,C), nPoints, 17)
+ val validationRDD = sc.parallelize(validationData,2)
+
+ // Test prediction on RDD.
+ validatePrediction(model.predict(validationRDD.map(_._2)).collect(), validationData)
+
+ // Test prediction on Array.
+ validatePrediction(validationData.map(row => model.predict(row._2)), validationData)
+ }
+}
diff --git a/pom.xml b/pom.xml
index eb7bd7e9df..f8fbc12aa5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -58,10 +58,10 @@
<module>core</module>
<module>bagel</module>
<module>examples</module>
+ <module>mllib</module>
<module>tools</module>
<module>streaming</module>
<module>repl</module>
- <module>repl-bin</module>
</modules>
<properties>
@@ -184,6 +184,11 @@
<version>0.8.4</version>
</dependency>
<dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <version>1.0.5</version>
+ </dependency>
+ <dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
<version>4.0</version>
@@ -194,9 +199,14 @@
<version>2.4.1</version>
</dependency>
<dependency>
- <groupId>de.javakaffee</groupId>
- <artifactId>kryo-serializers</artifactId>
- <version>0.22</version>
+ <groupId>com.twitter</groupId>
+ <artifactId>chill_2.9.3</artifactId>
+ <version>0.3.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>chill-java</artifactId>
+ <version>0.3.0</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
@@ -250,6 +260,16 @@
<version>2.5</version>
</dependency>
<dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>3.0.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-jvm</artifactId>
+ <version>3.0.0</version>
+ </dependency>
+ <dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
@@ -619,5 +639,25 @@
</dependencies>
</dependencyManagement>
</profile>
+ <profile>
+ <id>assembly</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <modules>
+ <module>assembly</module>
+ </modules>
+ </profile>
+ <profile>
+ <id>expensive-modules</id>
+ <activation>
+ <property>
+ <name>!noExpensive</name>
+ </property>
+ </activation>
+ <modules>
+ <module>repl-bin</module>
+ </modules>
+ </profile>
</profiles>
</project>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index f3f67b57c8..bb96ad4ae3 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -44,7 +44,7 @@ object SparkBuild extends Build {
lazy val core = Project("core", file("core"), settings = coreSettings)
- lazy val repl = Project("repl", file("repl"), settings = replSettings) dependsOn (core)
+ lazy val repl = Project("repl", file("repl"), settings = replSettings) dependsOn (core) dependsOn(bagel) dependsOn(mllib)
lazy val examples = Project("examples", file("examples"), settings = examplesSettings) dependsOn (core) dependsOn (streaming)
@@ -151,6 +151,7 @@ object SparkBuild extends Build {
val excludeJackson = ExclusionRule(organization = "org.codehaus.jackson")
val excludeNetty = ExclusionRule(organization = "org.jboss.netty")
val excludeAsm = ExclusionRule(organization = "asm")
+ val excludeSnappy = ExclusionRule(organization = "org.xerial.snappy")
def coreSettings = sharedSettings ++ Seq(
name := "spark-core",
@@ -168,18 +169,22 @@ object SparkBuild extends Build {
"org.slf4j" % "slf4j-log4j12" % slf4jVersion,
"commons-daemon" % "commons-daemon" % "1.0.10",
"com.ning" % "compress-lzf" % "0.8.4",
+ "org.xerial.snappy" % "snappy-java" % "1.0.5",
"org.ow2.asm" % "asm" % "4.0",
"com.google.protobuf" % "protobuf-java" % "2.4.1",
- "de.javakaffee" % "kryo-serializers" % "0.22",
"com.typesafe.akka" % "akka-actor" % "2.0.5" excludeAll(excludeNetty),
"com.typesafe.akka" % "akka-remote" % "2.0.5" excludeAll(excludeNetty),
"com.typesafe.akka" % "akka-slf4j" % "2.0.5" excludeAll(excludeNetty),
"it.unimi.dsi" % "fastutil" % "6.4.4",
"colt" % "colt" % "1.2.0",
"net.liftweb" % "lift-json_2.9.2" % "2.5",
- "org.apache.mesos" % "mesos" % "0.9.0-incubating",
+ "org.apache.mesos" % "mesos" % "0.12.0-incubating",
"io.netty" % "netty-all" % "4.0.0.Beta2",
- "org.apache.derby" % "derby" % "10.4.2.0" % "test"
+ "org.apache.derby" % "derby" % "10.4.2.0" % "test",
+ "com.codahale.metrics" % "metrics-core" % "3.0.0",
+ "com.codahale.metrics" % "metrics-jvm" % "3.0.0",
+ "com.twitter" % "chill_2.9.3" % "0.3.0",
+ "com.twitter" % "chill-java" % "0.3.0"
) ++ (
if (HADOOP_MAJOR_VERSION == "2") {
if (HADOOP_YARN) {
@@ -232,6 +237,7 @@ object SparkBuild extends Build {
exclude("jline","jline")
exclude("log4j","log4j")
exclude("org.apache.cassandra.deps", "avro")
+ excludeAll(excludeSnappy)
)
)
@@ -239,7 +245,9 @@ object SparkBuild extends Build {
name := "spark-tools"
)
- def bagelSettings = sharedSettings ++ Seq(name := "spark-bagel")
+ def bagelSettings = sharedSettings ++ Seq(
+ name := "spark-bagel"
+ )
def mllibSettings = sharedSettings ++ Seq(
name := "spark-mllib",
@@ -254,7 +262,7 @@ object SparkBuild extends Build {
"Akka Repository" at "http://repo.akka.io/releases/"
),
libraryDependencies ++= Seq(
- "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty),
+ "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy),
"com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty),
"org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty),
"com.typesafe.akka" % "akka-zeromq" % "2.0.5" excludeAll(excludeNetty)
diff --git a/pyspark b/pyspark
index 37a355462e..801239c108 100755
--- a/pyspark
+++ b/pyspark
@@ -53,9 +53,13 @@ if [[ "$SPARK_LAUNCH_WITH_SCALA" != "0" ]] ; then
export SPARK_LAUNCH_WITH_SCALA=1
fi
+if [ -n "$IPYTHON_OPTS" ]; then
+ IPYTHON=1
+fi
+
if [[ "$IPYTHON" = "1" ]] ; then
- export PYSPARK_PYTHON="ipython"
- exec "$PYSPARK_PYTHON" -i -c "%run $PYTHONSTARTUP"
+ IPYTHON_OPTS=${IPYTHON_OPTS:--i}
+ exec ipython "$IPYTHON_OPTS" -c "%run $PYTHONSTARTUP"
else
- exec "$PYSPARK_PYTHON" "$@"
+ exec "$PYSPARK_PYTHON" "$@"
fi
diff --git a/python/examples/als.py b/python/examples/als.py
index f2b2eee64c..a77dfb2577 100755
--- a/python/examples/als.py
+++ b/python/examples/als.py
@@ -48,8 +48,7 @@ def update(i, vec, mat, ratings):
if __name__ == "__main__":
if len(sys.argv) < 2:
- print >> sys.stderr, \
- "Usage: PythonALS <master> <M> <U> <F> <iters> <slices>"
+ print >> sys.stderr, "Usage: als <master> <M> <U> <F> <iters> <slices>"
exit(-1)
sc = SparkContext(sys.argv[1], "PythonALS", pyFiles=[realpath(__file__)])
M = int(sys.argv[2]) if len(sys.argv) > 2 else 100
@@ -84,5 +83,5 @@ if __name__ == "__main__":
usb = sc.broadcast(us)
error = rmse(R, ms, us)
- print "Iteration %d:" % i
+ print "Iteration %d:" % i
print "\nRMSE: %5.4f\n" % error
diff --git a/python/examples/kmeans.py b/python/examples/kmeans.py
index c670556f2b..ba31af92fc 100644..100755
--- a/python/examples/kmeans.py
+++ b/python/examples/kmeans.py
@@ -41,8 +41,7 @@ def closestPoint(p, centers):
if __name__ == "__main__":
if len(sys.argv) < 5:
- print >> sys.stderr, \
- "Usage: PythonKMeans <master> <file> <k> <convergeDist>"
+ print >> sys.stderr, "Usage: kmeans <master> <file> <k> <convergeDist>"
exit(-1)
sc = SparkContext(sys.argv[1], "PythonKMeans")
lines = sc.textFile(sys.argv[2])
diff --git a/python/examples/logistic_regression.py b/python/examples/logistic_regression.py
index 54d227d0d3..3ac1bae4e9 100755
--- a/python/examples/logistic_regression.py
+++ b/python/examples/logistic_regression.py
@@ -35,7 +35,7 @@ np.random.seed(42)
DataPoint = namedtuple("DataPoint", ['x', 'y'])
-from lr import DataPoint # So that DataPoint is properly serialized
+from logistic_regression import DataPoint # So that DataPoint is properly serialized
def generateData():
@@ -48,8 +48,7 @@ def generateData():
if __name__ == "__main__":
if len(sys.argv) == 1:
- print >> sys.stderr, \
- "Usage: PythonLR <master> [<slices>]"
+ print >> sys.stderr, "Usage: logistic_regression <master> [<slices>]"
exit(-1)
sc = SparkContext(sys.argv[1], "PythonLR", pyFiles=[realpath(__file__)])
slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2
diff --git a/python/examples/pi.py b/python/examples/pi.py
index 33c026e824..ab0645fc2f 100644..100755
--- a/python/examples/pi.py
+++ b/python/examples/pi.py
@@ -24,8 +24,7 @@ from pyspark import SparkContext
if __name__ == "__main__":
if len(sys.argv) == 1:
- print >> sys.stderr, \
- "Usage: PythonPi <master> [<slices>]"
+ print >> sys.stderr, "Usage: pi <master> [<slices>]"
exit(-1)
sc = SparkContext(sys.argv[1], "PythonPi")
slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2
diff --git a/python/examples/transitive_closure.py b/python/examples/transitive_closure.py
index 40be3b5000..744cce6651 100644..100755
--- a/python/examples/transitive_closure.py
+++ b/python/examples/transitive_closure.py
@@ -37,10 +37,9 @@ def generateGraph():
if __name__ == "__main__":
if len(sys.argv) == 1:
- print >> sys.stderr, \
- "Usage: PythonTC <master> [<slices>]"
+ print >> sys.stderr, "Usage: transitive_closure <master> [<slices>]"
exit(-1)
- sc = SparkContext(sys.argv[1], "PythonTC")
+ sc = SparkContext(sys.argv[1], "PythonTransitiveClosure")
slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2
tc = sc.parallelize(generateGraph(), slices).cache()
diff --git a/python/examples/wordcount.py b/python/examples/wordcount.py
index 41c846ba79..a6de22766a 100644..100755
--- a/python/examples/wordcount.py
+++ b/python/examples/wordcount.py
@@ -23,8 +23,7 @@ from pyspark import SparkContext
if __name__ == "__main__":
if len(sys.argv) < 3:
- print >> sys.stderr, \
- "Usage: PythonWordCount <master> <file>"
+ print >> sys.stderr, "Usage: wordcount <master> <file>"
exit(-1)
sc = SparkContext(sys.argv[1], "PythonWordCount")
lines = sc.textFile(sys.argv[2], 1)
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 2f741cb345..c2b49ff37a 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -141,14 +141,21 @@ class SparkContext(object):
def parallelize(self, c, numSlices=None):
"""
Distribute a local Python collection to form an RDD.
+
+ >>> sc.parallelize(range(5), 5).glom().collect()
+ [[0], [1], [2], [3], [4]]
"""
numSlices = numSlices or self.defaultParallelism
# Calling the Java parallelize() method with an ArrayList is too slow,
# because it sends O(n) Py4J commands. As an alternative, serialized
# objects are written to a file and loaded through textFile().
tempFile = NamedTemporaryFile(delete=False, dir=self._temp_dir)
- if self.batchSize != 1:
- c = batched(c, self.batchSize)
+ # Make sure we distribute data evenly if it's smaller than self.batchSize
+ if "__len__" not in dir(c):
+ c = list(c) # Make it a list so we can compute its length
+ batchSize = min(len(c) // numSlices, self.batchSize)
+ if batchSize > 1:
+ c = batched(c, batchSize)
for x in c:
write_with_length(dump_pickle(x), tempFile)
tempFile.close()
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index c6a6b24c5a..51c2cb9806 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -160,7 +160,7 @@ class RDD(object):
>>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
[1, 2, 3]
"""
- return self.map(lambda x: (x, "")) \
+ return self.map(lambda x: (x, None)) \
.reduceByKey(lambda x, _: x) \
.map(lambda (x, _): x)
@@ -267,7 +267,11 @@ class RDD(object):
>>> def f(x): print x
>>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
"""
- self.map(f).collect() # Force evaluation
+ def processPartition(iterator):
+ for x in iterator:
+ f(x)
+ yield None
+ self.mapPartitions(processPartition).collect() # Force evaluation
def collect(self):
"""
@@ -386,13 +390,16 @@ class RDD(object):
>>> sc.parallelize([2, 3, 4, 5, 6]).take(10)
[2, 3, 4, 5, 6]
"""
+ def takeUpToNum(iterator):
+ taken = 0
+ while taken < num:
+ yield next(iterator)
+ taken += 1
+ # Take only up to num elements from each partition we try
+ mapped = self.mapPartitions(takeUpToNum)
items = []
- for partition in range(self._jrdd.splits().size()):
- iterator = self.ctx._takePartition(self._jrdd.rdd(), partition)
- # Each item in the iterator is a string, Python object, batch of
- # Python objects. Regardless, it is sufficient to take `num`
- # of these objects in order to collect `num` Python objects:
- iterator = iterator.take(num)
+ for partition in range(mapped._jrdd.splits().size()):
+ iterator = self.ctx._takePartition(mapped._jrdd.rdd(), partition)
items.extend(self._collect_iterator_through_file(iterator))
if len(items) >= num:
break
@@ -749,9 +756,8 @@ class PipelinedRDD(RDD):
self.ctx._gateway._gateway_client)
self.ctx._pickled_broadcast_vars.clear()
class_manifest = self._prev_jrdd.classManifest()
- env = copy.copy(self.ctx.environment)
- env['PYTHONPATH'] = os.environ.get("PYTHONPATH", "")
- env = MapConverter().convert(env, self.ctx._gateway._gateway_client)
+ env = MapConverter().convert(self.ctx.environment,
+ self.ctx._gateway._gateway_client)
python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
pipe_command, env, self.preservesPartitioning, self.ctx.pythonExec,
broadcast_vars, self.ctx._javaAccumulator, class_manifest)
diff --git a/python/run-tests b/python/run-tests
index 1ee947d414..6643faa2e0 100755
--- a/python/run-tests
+++ b/python/run-tests
@@ -21,6 +21,9 @@
# Figure out where the Spark framework is installed
FWDIR="$(cd `dirname $0`; cd ../; pwd)"
+# CD into the python directory to find things on the right path
+cd "$FWDIR/python"
+
FAILED=0
$FWDIR/pyspark pyspark/rdd.py
diff --git a/repl/src/main/scala/spark/repl/SparkILoop.scala b/repl/src/main/scala/spark/repl/SparkILoop.scala
index 59f9d05683..0cecbd71ad 100644
--- a/repl/src/main/scala/spark/repl/SparkILoop.scala
+++ b/repl/src/main/scala/spark/repl/SparkILoop.scala
@@ -831,6 +831,10 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
var sparkContext: SparkContext = null
def createSparkContext(): SparkContext = {
+ val uri = System.getenv("SPARK_EXECUTOR_URI")
+ if (uri != null) {
+ System.setProperty("spark.executor.uri", uri)
+ }
val master = this.master match {
case Some(m) => m
case None => {
diff --git a/run b/run
index f49aa92796..0a440627a1 100755
--- a/run
+++ b/run
@@ -17,6 +17,8 @@
# limitations under the License.
#
+SCALA_VERSION=2.9.3
+
# Figure out where the Scala framework is installed
FWDIR="$(cd `dirname $0`; pwd)"
@@ -70,7 +72,10 @@ esac
# hard to kill the child with stuff like Process.destroy(). However, for
# the Spark shell, the wrapper is necessary to properly reset the terminal
# when we exit, so we allow it to set a variable to launch with scala.
-if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then
+# We still fall back on java for the shell if this is a "release" created
+# from make-distribution.sh since it's possible scala is not installed
+# but we have everything we need to run the shell.
+if [[ "$SPARK_LAUNCH_WITH_SCALA" == "1" && ! -f "$FWDIR/RELEASE" ]]; then
if [ "$SCALA_HOME" ]; then
RUNNER="${SCALA_HOME}/bin/scala"
else
@@ -136,6 +141,17 @@ if [ ! -f "$FWDIR/RELEASE" ]; then
echo "You need to compile Spark repl module before running this program" >&2
exit 1
fi
+
+ # Figure out the JAR file that our examples were packaged into. This includes a bit of a hack
+ # to avoid the -sources and -doc packages that are built by publish-local.
+ if [ -e "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*[0-9T].jar ]; then
+ # Use the JAR from the SBT build
+ export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*[0-9T].jar`
+ fi
+ if [ -e "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar ]; then
+ # Use the JAR from the Maven build
+ export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar`
+ fi
fi
# Compute classpath using external script
diff --git a/spark-shell b/spark-shell
index 31a4138124..62fc18550d 100755
--- a/spark-shell
+++ b/spark-shell
@@ -79,6 +79,7 @@ if [[ ! $? ]]; then
saved_stty=""
fi
+export SPARK_LAUNCH_WITH_SCALA=${SPARK_LAUNCH_WITH_SCALA:-1}
$FWDIR/run $OPTIONS spark.repl.Main "$@"
# record the exit status lest it be overwritten:
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index 1e4c1e3742..070d930b5e 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -17,16 +17,17 @@
package spark.streaming
-import spark.{Logging, Utils}
-
-import org.apache.hadoop.fs.{FileUtil, Path}
-import org.apache.hadoop.conf.Configuration
-
import java.io._
-import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import java.util.concurrent.Executors
import java.util.concurrent.RejectedExecutionException
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.conf.Configuration
+
+import spark.Logging
+import spark.io.CompressionCodec
+
+
private[streaming]
class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
extends Logging with Serializable {
@@ -49,6 +50,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
}
}
+
/**
* Convenience class to speed up the writing of graph checkpoint to file
*/
@@ -66,6 +68,8 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
val maxAttempts = 3
val executor = Executors.newFixedThreadPool(1)
+ private val compressionCodec = CompressionCodec.createCodec()
+
// Removed code which validates whether there is only one CheckpointWriter per path 'file' since
// I did not notice any errors - reintroduce it ?
@@ -103,7 +107,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
def write(checkpoint: Checkpoint) {
val bos = new ByteArrayOutputStream()
- val zos = new LZFOutputStream(bos)
+ val zos = compressionCodec.compressedOutputStream(bos)
val oos = new ObjectOutputStream(zos)
oos.writeObject(checkpoint)
oos.close()
@@ -137,6 +141,8 @@ object CheckpointReader extends Logging {
val fs = new Path(path).getFileSystem(new Configuration())
val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk"))
+ val compressionCodec = CompressionCodec.createCodec()
+
attempts.foreach(file => {
if (fs.exists(file)) {
logInfo("Attempting to load checkpoint from file '" + file + "'")
@@ -147,7 +153,7 @@ object CheckpointReader extends Logging {
// of ObjectInputStream is used to explicitly use the current thread's default class
// loader to find and load classes. This is a well know Java issue and has popped up
// in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627)
- val zis = new LZFInputStream(fis)
+ val zis = compressionCodec.compressedInputStream(fis)
val ois = new ObjectInputStreamWithLoader(zis, Thread.currentThread().getContextClassLoader)
val cp = ois.readObject.asInstanceOf[Checkpoint]
ois.close()
@@ -170,7 +176,9 @@ object CheckpointReader extends Logging {
}
private[streaming]
-class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoader) extends ObjectInputStream(inputStream_) {
+class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoader)
+ extends ObjectInputStream(inputStream_) {
+
override def resolveClass(desc: ObjectStreamClass): Class[_] = {
try {
return loader.loadClass(desc.getName())
diff --git a/tools/pom.xml b/tools/pom.xml
index 3dfba5808c..878eb82f18 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -20,28 +20,24 @@
<parent>
<groupId>org.spark-project</groupId>
<artifactId>spark-parent</artifactId>
- <version>0.7.4-SNAPSHOT</version>
+ <version>0.8.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>org.spark-project</groupId>
- <artifactId>spark-tools_2.9.3</artifactId>
+ <artifactId>spark-tools</artifactId>
<packaging>jar</packaging>
<name>Spark Project Tools</name>
<url>http://spark-project.org/</url>
<dependencies>
<dependency>
- <groupId>org.spark-project</groupId>
- <artifactId>spark-core_2.9.3</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.spark-project</groupId>
- <artifactId>spark-streaming_2.9.3</artifactId>
- <version>${project.version}</version>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.version}</artifactId>
+ <scope>test</scope>
</dependency>
</dependencies>
+
<build>
<outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory>
@@ -60,4 +56,121 @@
</plugin>
</plugins>
</build>
+
+ <profiles>
+ <profile>
+ <id>hadoop1</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-core</artifactId>
+ <version>${project.version}</version>
+ <classifier>hadoop1</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-streaming</artifactId>
+ <version>${project.version}</version>
+ <classifier>hadoop1</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <classifier>hadoop1</classifier>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>hadoop2</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-core</artifactId>
+ <version>${project.version}</version>
+ <classifier>hadoop2</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-streaming</artifactId>
+ <version>${project.version}</version>
+ <classifier>hadoop2</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <classifier>hadoop2</classifier>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>hadoop2-yarn</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-core</artifactId>
+ <version>${project.version}</version>
+ <classifier>hadoop2-yarn</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-streaming</artifactId>
+ <version>${project.version}</version>
+ <classifier>hadoop2-yarn</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <classifier>hadoop2-yarn</classifier>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
</project>