aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-01-02 23:15:55 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-01-02 23:15:55 -0800
commit30b9db0abedbef7afa06a927a011147cfeca2a70 (patch)
treef53dd9dae7030a8a97d26e59749443cd0f78607a
parent498a5f0a1c6e82a33c2ad8c48b68bbdb8da57a95 (diff)
parentf442afc22ef2e8a10ea22d5a7b392d41a1c7fdf8 (diff)
downloadspark-30b9db0abedbef7afa06a927a011147cfeca2a70.tar.gz
spark-30b9db0abedbef7afa06a927a011147cfeca2a70.tar.bz2
spark-30b9db0abedbef7afa06a927a011147cfeca2a70.zip
Merge pull request #285 from colorant/yarn-refactor
Yarn refactor
-rw-r--r--assembly/pom.xml12
-rw-r--r--docs/building-with-maven.md14
-rw-r--r--docs/running-on-yarn.md3
-rw-r--r--new-yarn/pom.xml161
-rw-r--r--pom.xml59
-rw-r--r--project/SparkBuild.scala32
-rw-r--r--yarn/README.md12
-rw-r--r--yarn/alpha/pom.xml32
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala (renamed from yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala)0
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala (renamed from yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala)0
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala (renamed from yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala)15
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala (renamed from yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala)2
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala (renamed from yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala)0
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala (renamed from new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala)0
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala (renamed from new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala)0
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala (renamed from new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala)0
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala (renamed from new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala)0
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala (renamed from new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala)0
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala (renamed from new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala)0
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala (renamed from new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala)0
-rw-r--r--yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala (renamed from new-yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala)0
-rw-r--r--yarn/pom.xml84
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala94
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala147
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala228
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala43
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala48
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala110
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala59
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala220
-rw-r--r--yarn/stable/pom.xml32
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala (renamed from new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala)0
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala (renamed from new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala)0
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala (renamed from new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala)6
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala (renamed from new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala)0
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala (renamed from new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala)2
36 files changed, 189 insertions, 1226 deletions
diff --git a/assembly/pom.xml b/assembly/pom.xml
index fc2adc1fbb..079509bec6 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -124,7 +124,17 @@
<profiles>
<profile>
- <id>hadoop2-yarn</id>
+ <id>yarn-alpha</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-yarn-alpha_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
+ <id>yarn</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
diff --git a/docs/building-with-maven.md b/docs/building-with-maven.md
index c709001632..b9ff0af76f 100644
--- a/docs/building-with-maven.md
+++ b/docs/building-with-maven.md
@@ -37,20 +37,16 @@ For Apache Hadoop versions 1.x, Cloudera CDH MRv1, and other Hadoop versions wit
# Cloudera CDH 4.2.0 with MapReduce v1
$ mvn -Dhadoop.version=2.0.0-mr1-cdh4.2.0 -DskipTests clean package
-For Apache Hadoop 2.x, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions with YARN, you should enable the "hadoop2-yarn" profile and set the "yarn.version" property:
+For Apache Hadoop 2.x, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions with YARN, you should enable the "yarn-alpha" or "yarn" profile and set the "hadoop.version", "yarn.version" property:
# Apache Hadoop 2.0.5-alpha
- $ mvn -Phadoop2-yarn -Dhadoop.version=2.0.5-alpha -Dyarn.version=2.0.5-alpha -DskipTests clean package
+ $ mvn -Pyarn-alpha -Dhadoop.version=2.0.5-alpha -Dyarn.version=2.0.5-alpha -DskipTests clean package
# Cloudera CDH 4.2.0 with MapReduce v2
- $ mvn -Phadoop2-yarn -Dhadoop.version=2.0.0-cdh4.2.0 -Dyarn.version=2.0.0-chd4.2.0 -DskipTests clean package
+ $ mvn -Pyarn-alpha -Dhadoop.version=2.0.0-cdh4.2.0 -Dyarn.version=2.0.0-chd4.2.0 -DskipTests clean package
-Hadoop versions 2.2.x and newer can be built by setting the ```new-yarn``` and the ```yarn.version``` as follows:
-
- # Apache Hadoop 2.2.X and newer
- $ mvn -Dyarn.version=2.2.0 -Dhadoop.version=2.2.0 -Pnew-yarn
-
-The build process handles Hadoop 2.2.x as a special case that uses the directory ```new-yarn```, which supports the new YARN API. Furthermore, for this version, the build depends on artifacts published by the spark-project to enable Akka 2.0.5 to work with protobuf 2.5.
+ # Apache Hadoop 2.2.X ( e.g. 2.2.0 as below ) and newer
+ $ mvn -Pyarn -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 -DskipTests clean package
## Spark Tests in Maven ##
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index aa75ca4324..2c1b2cc294 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -116,8 +116,6 @@ For example:
# Building Spark for Hadoop/YARN 2.2.x
-Hadoop 2.2.x users must build Spark and publish it locally. The SBT build process handles Hadoop 2.2.x as a special case. This version of Hadoop has new YARN API changes and depends on a Protobuf version (2.5) that is not compatible with the Akka version (2.0.5) that Spark uses. Therefore, if the Hadoop version (e.g. set through ```SPARK_HADOOP_VERSION```) starts with 2.2.0 or higher then the build process will depend on Akka artifacts distributed by the Spark project compatible with Protobuf 2.5. Furthermore, the build process then uses the directory ```new-yarn``` (instead of ```yarn```), which supports the new YARN API. The build process should seamlessly work out of the box.
-
See [Building Spark with Maven](building-with-maven.html) for instructions on how to build Spark using the Maven process.
# Important Notes
@@ -126,4 +124,3 @@ See [Building Spark with Maven](building-with-maven.html) for instructions on ho
- The local directories used for spark will be the local directories configured for YARN (Hadoop Yarn config yarn.nodemanager.local-dirs). If the user specifies spark.local.dir, it will be ignored.
- The --files and --archives options support specifying file names with the # similar to Hadoop. For example you can specify: --files localtest.txt#appSees.txt and this will upload the file you have locally named localtest.txt into HDFS but this will be linked to by the name appSees.txt and your application should use the name as appSees.txt to reference it when running on YARN.
- The --addJars option allows the SparkContext.addJar function to work if you are using it with local files. It does not need to be used if you are using it with HDFS, HTTP, HTTPS, or FTP files.
-- YARN 2.2.x users cannot simply depend on the Spark packages without building Spark, as the published Spark artifacts are compiled to work with the pre 2.2 API. Those users must build Spark and publish it locally.
diff --git a/new-yarn/pom.xml b/new-yarn/pom.xml
deleted file mode 100644
index 4cd28f34e3..0000000000
--- a/new-yarn/pom.xml
+++ /dev/null
@@ -1,161 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-parent</artifactId>
- <version>0.9.0-incubating-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
-
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-yarn_2.10</artifactId>
- <packaging>jar</packaging>
- <name>Spark Project YARN Support</name>
- <url>http://spark.incubator.apache.org/</url>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.10</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-common</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-client</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${yarn.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-ipc</artifactId>
- </dependency>
- <dependency>
- <groupId>org.scalatest</groupId>
- <artifactId>scalatest_2.10</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
- <testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <configuration>
- <shadedArtifactAttached>false</shadedArtifactAttached>
- <outputFile>${project.build.directory}/${project.artifactId}-${project.version}-shaded.jar</outputFile>
- <artifactSet>
- <includes>
- <include>*:*</include>
- </includes>
- </artifactSet>
- <filters>
- <filter>
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- </excludes>
- </filter>
- </filters>
- </configuration>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
- <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
- <resource>reference.conf</resource>
- </transformer>
- </transformers>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-antrun-plugin</artifactId>
- <executions>
- <execution>
- <phase>test</phase>
- <goals>
- <goal>run</goal>
- </goals>
- <configuration>
- <exportAntProperties>true</exportAntProperties>
- <tasks>
- <property name="spark.classpath" refid="maven.test.classpath" />
- <property environment="env" />
- <fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
- <condition>
- <not>
- <or>
- <isset property="env.SCALA_HOME" />
- <isset property="env.SCALA_LIBRARY_PATH" />
- </or>
- </not>
- </condition>
- </fail>
- </tasks>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.scalatest</groupId>
- <artifactId>scalatest-maven-plugin</artifactId>
- <configuration>
- <environmentVariables>
- <SPARK_HOME>${basedir}/..</SPARK_HOME>
- <SPARK_TESTING>1</SPARK_TESTING>
- <SPARK_CLASSPATH>${spark.classpath}</SPARK_CLASSPATH>
- </environmentVariables>
- </configuration>
- </plugin>
- </plugins>
- </build>
-</project>
diff --git a/pom.xml b/pom.xml
index 3a8eb882cc..aa2f076aac 100644
--- a/pom.xml
+++ b/pom.xml
@@ -722,7 +722,7 @@
<profiles>
<profile>
- <id>hadoop2-yarn</id>
+ <id>yarn-alpha</id>
<properties>
<hadoop.major.version>2</hadoop.major.version>
<!-- 0.23.* is same as 2.0.* - except hardened to run production jobs -->
@@ -735,57 +735,20 @@
<module>yarn</module>
</modules>
- <repositories>
- <repository>
- <id>maven-root</id>
- <name>Maven root repository</name>
- <url>http://repo1.maven.org/maven2</url>
- <releases>
- <enabled>true</enabled>
- </releases>
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- </repository>
- </repositories>
-
- <dependencyManagement>
- <dependencies>
- </dependencies>
- </dependencyManagement>
</profile>
<profile>
- <id>new-yarn</id>
- <properties>
- <hadoop.major.version>2</hadoop.major.version>
- <hadoop.version>2.2.0</hadoop.version>
- <protobuf.version>2.5.0</protobuf.version>
- </properties>
-
- <modules>
- <module>new-yarn</module>
- </modules>
-
- <repositories>
- <repository>
- <id>maven-root</id>
- <name>Maven root repository</name>
- <url>http://repo1.maven.org/maven2</url>
- <releases>
- <enabled>true</enabled>
- </releases>
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- </repository>
- </repositories>
+ <id>yarn</id>
+ <properties>
+ <hadoop.major.version>2</hadoop.major.version>
+ <hadoop.version>2.2.0</hadoop.version>
+ <protobuf.version>2.5.0</protobuf.version>
+ </properties>
+ <modules>
+ <module>yarn</module>
+ </modules>
- <dependencyManagement>
- <dependencies>
- </dependencies>
- </dependencyManagement>
- </profile>
+ </profile>
<profile>
<id>repl-bin</id>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 2eef2dfa5e..b0749cc9c4 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -85,12 +85,11 @@ object SparkBuild extends Build {
}
// Conditionally include the yarn sub-project
- lazy val yarn = Project("yarn", file(if (isNewHadoop) "new-yarn" else "yarn"), settings = yarnSettings) dependsOn(core)
+ lazy val yarnAlpha = Project("yarn-alpha", file("yarn/alpha"), settings = yarnAlphaSettings) dependsOn(core)
+ lazy val yarn = Project("yarn", file("yarn/stable"), settings = yarnSettings) dependsOn(core)
- //lazy val yarn = Project("yarn", file("yarn"), settings = yarnSettings) dependsOn(core)
-
- lazy val maybeYarn = if (isYarnEnabled) Seq[ClasspathDependency](yarn) else Seq[ClasspathDependency]()
- lazy val maybeYarnRef = if (isYarnEnabled) Seq[ProjectReference](yarn) else Seq[ProjectReference]()
+ lazy val maybeYarn = if (isYarnEnabled) Seq[ClasspathDependency](if (isNewHadoop) yarn else yarnAlpha) else Seq[ClasspathDependency]()
+ lazy val maybeYarnRef = if (isYarnEnabled) Seq[ProjectReference](if (isNewHadoop) yarn else yarnAlpha) else Seq[ProjectReference]()
// Everything except assembly, tools and examples belong to packageProjects
lazy val packageProjects = Seq[ProjectReference](core, repl, bagel, streaming, mllib) ++ maybeYarnRef
@@ -320,10 +319,29 @@ object SparkBuild extends Build {
)
)
- def yarnSettings = sharedSettings ++ Seq(
- name := "spark-yarn"
+ def yarnCommonSettings = sharedSettings ++ Seq(
+ unmanagedSourceDirectories in Compile <++= baseDirectory { base =>
+ Seq(
+ base / "../common/src/main/scala"
+ )
+ },
+
+ unmanagedSourceDirectories in Test <++= baseDirectory { base =>
+ Seq(
+ base / "../common/src/test/scala"
+ )
+ }
+
) ++ extraYarnSettings
+ def yarnAlphaSettings = yarnCommonSettings ++ Seq(
+ name := "spark-yarn-alpha"
+ )
+
+ def yarnSettings = yarnCommonSettings ++ Seq(
+ name := "spark-yarn"
+ )
+
// Conditionally include the YARN dependencies because some tools look at all sub-projects and will complain
// if we refer to nonexistent dependencies (e.g. hadoop-yarn-api from a Hadoop version without YARN).
def extraYarnSettings = if(isYarnEnabled) yarnEnabledSettings else Seq()
diff --git a/yarn/README.md b/yarn/README.md
new file mode 100644
index 0000000000..65ee85447e
--- /dev/null
+++ b/yarn/README.md
@@ -0,0 +1,12 @@
+# YARN DIRECTORY LAYOUT
+
+Hadoop Yarn related codes are organized in separate directories to minimize duplicated code.
+
+ * common : Common codes that do not depending on specific version of Hadoop.
+
+ * alpha / stable : Codes that involve specific version of Hadoop YARN API.
+
+ alpha represents 0.23 and 2.0.x
+ stable represents 2.2 and later, until the API changes again.
+
+alpha / stable will build together with common dir into a single jar
diff --git a/yarn/alpha/pom.xml b/yarn/alpha/pom.xml
new file mode 100644
index 0000000000..8291e9e7a3
--- /dev/null
+++ b/yarn/alpha/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>yarn-parent_2.10</artifactId>
+ <version>0.9.0-incubating-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-yarn-alpha_2.10</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Project YARN Alpha API</name>
+
+</project>
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 7cf120d3eb..7cf120d3eb 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 2bd047c97a..2bd047c97a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index a8de89c670..e64530702c 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -39,28 +39,29 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
def this(args: ApplicationMasterArguments) = this(args, new Configuration())
private val rpc: YarnRPC = YarnRPC.create(conf)
- private var resourceManager: AMRMProtocol = null
- private var appAttemptId: ApplicationAttemptId = null
- private var reporterThread: Thread = null
+ private var resourceManager: AMRMProtocol = _
+ private var appAttemptId: ApplicationAttemptId = _
+ private var reporterThread: Thread = _
private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
- private var yarnAllocator: YarnAllocationHandler = null
+ private var yarnAllocator: YarnAllocationHandler = _
private var driverClosed:Boolean = false
private val sparkConf = new SparkConf
val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
conf = sparkConf)._1
- var actor: ActorRef = null
+ var actor: ActorRef = _
// This actor just working as a monitor to watch on Driver Actor.
class MonitorActor(driverUrl: String) extends Actor {
- var driver: ActorSelection = null
+ var driver: ActorSelection = _
override def preStart() {
logInfo("Listen to driver: " + driverUrl)
driver = context.actorSelection(driverUrl)
- driver ! "hello"
+ // Send a hello message thus the connection is actually established, thus we can monitor Lifecycle Events.
+ driver ! "Hello"
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index 6a90cc51cf..4f34bd913e 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@ -51,7 +51,7 @@ class WorkerRunnable(
extends Runnable with Logging {
var rpc: YarnRPC = YarnRPC.create(conf)
- var cm: ContainerManager = null
+ var cm: ContainerManager = _
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
def run = {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index c8af653b3f..c8af653b3f 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index f76a5ddd39..f76a5ddd39 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 7aac2328da..7aac2328da 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
index 5f159b073f..5f159b073f 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 2ba2366ead..2ba2366ead 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
diff --git a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
index 522e0a9ad7..522e0a9ad7 100644
--- a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
diff --git a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 4b69f5078b..4b69f5078b 100644
--- a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
diff --git a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
index a4638cc863..a4638cc863 100644
--- a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
diff --git a/new-yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
index 2941356bc5..2941356bc5 100644
--- a/new-yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
+++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
diff --git a/yarn/pom.xml b/yarn/pom.xml
index bc64a190fd..aea8b0cdde 100644
--- a/yarn/pom.xml
+++ b/yarn/pom.xml
@@ -25,11 +25,10 @@
</parent>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-yarn_2.10</artifactId>
- <packaging>jar</packaging>
- <name>Spark Project YARN Support</name>
- <url>http://spark.incubator.apache.org/</url>
-
+ <artifactId>yarn-parent_2.10</artifactId>
+ <packaging>pom</packaging>
+ <name>Spark Project YARN Parent POM</name>
+
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
@@ -73,45 +72,52 @@
</dependency>
</dependencies>
+ <profiles>
+ <profile>
+ <id>yarn-alpha</id>
+ <modules>
+ <module>alpha</module>
+ </modules>
+ </profile>
+
+ <profile>
+ <id>yarn</id>
+ <modules>
+ <module>stable</module>
+ </modules>
+ </profile>
+ </profiles>
+
<build>
- <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
- <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <configuration>
- <shadedArtifactAttached>false</shadedArtifactAttached>
- <outputFile>${project.build.directory}/${project.artifactId}-${project.version}-shaded.jar</outputFile>
- <artifactSet>
- <includes>
- <include>*:*</include>
- </includes>
- </artifactSet>
- <filters>
- <filter>
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- </excludes>
- </filter>
- </filters>
- </configuration>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
- <phase>package</phase>
+ <id>add-scala-sources</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/scala</source>
+ <source>../common/src/main/scala</source>
+ </sources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>add-scala-test-sources</id>
+ <phase>generate-test-sources</phase>
<goals>
- <goal>shade</goal>
+ <goal>add-test-source</goal>
</goals>
<configuration>
- <transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
- <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
- <resource>reference.conf</resource>
- </transformer>
- </transformers>
+ <sources>
+ <source>src/test/scala</source>
+ <source>../common/src/test/scala</source>
+ </sources>
</configuration>
</execution>
</executions>
@@ -150,12 +156,16 @@
<artifactId>scalatest-maven-plugin</artifactId>
<configuration>
<environmentVariables>
- <SPARK_HOME>${basedir}/..</SPARK_HOME>
+ <SPARK_HOME>${basedir}/../..</SPARK_HOME>
<SPARK_TESTING>1</SPARK_TESTING>
<SPARK_CLASSPATH>${spark.classpath}</SPARK_CLASSPATH>
</environmentVariables>
</configuration>
</plugin>
</plugins>
+
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>
+
</project>
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
deleted file mode 100644
index f76a5ddd39..0000000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import org.apache.spark.util.IntParam
-import collection.mutable.ArrayBuffer
-
-class ApplicationMasterArguments(val args: Array[String]) {
- var userJar: String = null
- var userClass: String = null
- var userArgs: Seq[String] = Seq[String]()
- var workerMemory = 1024
- var workerCores = 1
- var numWorkers = 2
-
- parseArgs(args.toList)
-
- private def parseArgs(inputArgs: List[String]): Unit = {
- val userArgsBuffer = new ArrayBuffer[String]()
-
- var args = inputArgs
-
- while (! args.isEmpty) {
-
- args match {
- case ("--jar") :: value :: tail =>
- userJar = value
- args = tail
-
- case ("--class") :: value :: tail =>
- userClass = value
- args = tail
-
- case ("--args") :: value :: tail =>
- userArgsBuffer += value
- args = tail
-
- case ("--num-workers") :: IntParam(value) :: tail =>
- numWorkers = value
- args = tail
-
- case ("--worker-memory") :: IntParam(value) :: tail =>
- workerMemory = value
- args = tail
-
- case ("--worker-cores") :: IntParam(value) :: tail =>
- workerCores = value
- args = tail
-
- case Nil =>
- if (userJar == null || userClass == null) {
- printUsageAndExit(1)
- }
-
- case _ =>
- printUsageAndExit(1, args)
- }
- }
-
- userArgs = userArgsBuffer.readOnly
- }
-
- def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
- if (unknownParam != null) {
- System.err.println("Unknown/unsupported param " + unknownParam)
- }
- System.err.println(
- "Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options] \n" +
- "Options:\n" +
- " --jar JAR_PATH Path to your application's JAR file (required)\n" +
- " --class CLASS_NAME Name of your application's main class (required)\n" +
- " --args ARGS Arguments to be passed to your application's main class.\n" +
- " Mutliple invocations are possible, each will be passed in order.\n" +
- " --num-workers NUM Number of workers to start (Default: 2)\n" +
- " --worker-cores NUM Number of cores for the workers (Default: 1)\n" +
- " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n")
- System.exit(exitCode)
- }
-}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
deleted file mode 100644
index 9075ca71e7..0000000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import org.apache.spark.SparkConf
-import org.apache.spark.util.MemoryParam
-import org.apache.spark.util.IntParam
-import collection.mutable.{ArrayBuffer, HashMap}
-import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo}
-
-// TODO: Add code and support for ensuring that yarn resource 'asks' are location aware !
-class ClientArguments(val args: Array[String]) {
- var addJars: String = null
- var files: String = null
- var archives: String = null
- var userJar: String = null
- var userClass: String = null
- var userArgs: Seq[String] = Seq[String]()
- var workerMemory = 1024
- var workerCores = 1
- var numWorkers = 2
- var amQueue = new SparkConf().get("QUEUE", "default")
- var amMemory: Int = 512
- var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
- var appName: String = "Spark"
- // TODO
- var inputFormatInfo: List[InputFormatInfo] = null
-
- parseArgs(args.toList)
-
- private def parseArgs(inputArgs: List[String]): Unit = {
- val userArgsBuffer: ArrayBuffer[String] = new ArrayBuffer[String]()
- val inputFormatMap: HashMap[String, InputFormatInfo] = new HashMap[String, InputFormatInfo]()
-
- var args = inputArgs
-
- while (! args.isEmpty) {
-
- args match {
- case ("--jar") :: value :: tail =>
- userJar = value
- args = tail
-
- case ("--class") :: value :: tail =>
- userClass = value
- args = tail
-
- case ("--args") :: value :: tail =>
- userArgsBuffer += value
- args = tail
-
- case ("--master-class") :: value :: tail =>
- amClass = value
- args = tail
-
- case ("--master-memory") :: MemoryParam(value) :: tail =>
- amMemory = value
- args = tail
-
- case ("--worker-memory") :: MemoryParam(value) :: tail =>
- workerMemory = value
- args = tail
-
- case ("--num-workers") :: IntParam(value) :: tail =>
- numWorkers = value
- args = tail
-
- case ("--worker-cores") :: IntParam(value) :: tail =>
- workerCores = value
- args = tail
-
- case ("--queue") :: value :: tail =>
- amQueue = value
- args = tail
-
- case ("--name") :: value :: tail =>
- appName = value
- args = tail
-
- case ("--addJars") :: value :: tail =>
- addJars = value
- args = tail
-
- case ("--files") :: value :: tail =>
- files = value
- args = tail
-
- case ("--archives") :: value :: tail =>
- archives = value
- args = tail
-
- case Nil =>
- if (userJar == null || userClass == null) {
- printUsageAndExit(1)
- }
-
- case _ =>
- printUsageAndExit(1, args)
- }
- }
-
- userArgs = userArgsBuffer.readOnly
- inputFormatInfo = inputFormatMap.values.toList
- }
-
-
- def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
- if (unknownParam != null) {
- System.err.println("Unknown/unsupported param " + unknownParam)
- }
- System.err.println(
- "Usage: org.apache.spark.deploy.yarn.Client [options] \n" +
- "Options:\n" +
- " --jar JAR_PATH Path to your application's JAR file (required)\n" +
- " --class CLASS_NAME Name of your application's main class (required)\n" +
- " --args ARGS Arguments to be passed to your application's main class.\n" +
- " Mutliple invocations are possible, each will be passed in order.\n" +
- " --num-workers NUM Number of workers to start (Default: 2)\n" +
- " --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" +
- " --master-class CLASS_NAME Class Name for Master (Default: spark.deploy.yarn.ApplicationMaster)\n" +
- " --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
- " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
- " --name NAME The name of your application (Default: Spark)\n" +
- " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')\n" +
- " --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" +
- " --files files Comma separated list of files to be distributed with the job.\n" +
- " --archives archives Comma separated list of archives to be distributed with the job."
- )
- System.exit(exitCode)
- }
-
-}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
deleted file mode 100644
index 5f159b073f..0000000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.URI
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileStatus
-import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.fs.permission.FsAction
-import org.apache.hadoop.yarn.api.records.LocalResource
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
-import org.apache.hadoop.yarn.api.records.LocalResourceType
-import org.apache.hadoop.yarn.util.{Records, ConverterUtils}
-
-import org.apache.spark.Logging
-
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.LinkedHashMap
-import scala.collection.mutable.Map
-
-
-/** Client side methods to setup the Hadoop distributed cache */
-class ClientDistributedCacheManager() extends Logging {
- private val distCacheFiles: Map[String, Tuple3[String, String, String]] =
- LinkedHashMap[String, Tuple3[String, String, String]]()
- private val distCacheArchives: Map[String, Tuple3[String, String, String]] =
- LinkedHashMap[String, Tuple3[String, String, String]]()
-
-
- /**
- * Add a resource to the list of distributed cache resources. This list can
- * be sent to the ApplicationMaster and possibly the workers so that it can
- * be downloaded into the Hadoop distributed cache for use by this application.
- * Adds the LocalResource to the localResources HashMap passed in and saves
- * the stats of the resources to they can be sent to the workers and verified.
- *
- * @param fs FileSystem
- * @param conf Configuration
- * @param destPath path to the resource
- * @param localResources localResource hashMap to insert the resource into
- * @param resourceType LocalResourceType
- * @param link link presented in the distributed cache to the destination
- * @param statCache cache to store the file/directory stats
- * @param appMasterOnly Whether to only add the resource to the app master
- */
- def addResource(
- fs: FileSystem,
- conf: Configuration,
- destPath: Path,
- localResources: HashMap[String, LocalResource],
- resourceType: LocalResourceType,
- link: String,
- statCache: Map[URI, FileStatus],
- appMasterOnly: Boolean = false) = {
- val destStatus = fs.getFileStatus(destPath)
- val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
- amJarRsrc.setType(resourceType)
- val visibility = getVisibility(conf, destPath.toUri(), statCache)
- amJarRsrc.setVisibility(visibility)
- amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(destPath))
- amJarRsrc.setTimestamp(destStatus.getModificationTime())
- amJarRsrc.setSize(destStatus.getLen())
- if (link == null || link.isEmpty()) throw new Exception("You must specify a valid link name")
- localResources(link) = amJarRsrc
-
- if (appMasterOnly == false) {
- val uri = destPath.toUri()
- val pathURI = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), null, link)
- if (resourceType == LocalResourceType.FILE) {
- distCacheFiles(pathURI.toString()) = (destStatus.getLen().toString(),
- destStatus.getModificationTime().toString(), visibility.name())
- } else {
- distCacheArchives(pathURI.toString()) = (destStatus.getLen().toString(),
- destStatus.getModificationTime().toString(), visibility.name())
- }
- }
- }
-
- /**
- * Adds the necessary cache file env variables to the env passed in
- * @param env
- */
- def setDistFilesEnv(env: Map[String, String]) = {
- val (keys, tupleValues) = distCacheFiles.unzip
- val (sizes, timeStamps, visibilities) = tupleValues.unzip3
-
- if (keys.size > 0) {
- env("SPARK_YARN_CACHE_FILES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n }
- env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") =
- timeStamps.reduceLeft[String] { (acc,n) => acc + "," + n }
- env("SPARK_YARN_CACHE_FILES_FILE_SIZES") =
- sizes.reduceLeft[String] { (acc,n) => acc + "," + n }
- env("SPARK_YARN_CACHE_FILES_VISIBILITIES") =
- visibilities.reduceLeft[String] { (acc,n) => acc + "," + n }
- }
- }
-
- /**
- * Adds the necessary cache archive env variables to the env passed in
- * @param env
- */
- def setDistArchivesEnv(env: Map[String, String]) = {
- val (keys, tupleValues) = distCacheArchives.unzip
- val (sizes, timeStamps, visibilities) = tupleValues.unzip3
-
- if (keys.size > 0) {
- env("SPARK_YARN_CACHE_ARCHIVES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n }
- env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") =
- timeStamps.reduceLeft[String] { (acc,n) => acc + "," + n }
- env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") =
- sizes.reduceLeft[String] { (acc,n) => acc + "," + n }
- env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") =
- visibilities.reduceLeft[String] { (acc,n) => acc + "," + n }
- }
- }
-
- /**
- * Returns the local resource visibility depending on the cache file permissions
- * @param conf
- * @param uri
- * @param statCache
- * @return LocalResourceVisibility
- */
- def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]):
- LocalResourceVisibility = {
- if (isPublic(conf, uri, statCache)) {
- return LocalResourceVisibility.PUBLIC
- }
- return LocalResourceVisibility.PRIVATE
- }
-
- /**
- * Returns a boolean to denote whether a cache file is visible to all(public)
- * or not
- * @param conf
- * @param uri
- * @param statCache
- * @return true if the path in the uri is visible to all, false otherwise
- */
- def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): Boolean = {
- val fs = FileSystem.get(uri, conf)
- val current = new Path(uri.getPath())
- //the leaf level file should be readable by others
- if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) {
- return false
- }
- return ancestorsHaveExecutePermissions(fs, current.getParent(), statCache)
- }
-
- /**
- * Returns true if all ancestors of the specified path have the 'execute'
- * permission set for all users (i.e. that other users can traverse
- * the directory heirarchy to the given path)
- * @param fs
- * @param path
- * @param statCache
- * @return true if all ancestors have the 'execute' permission set for all users
- */
- def ancestorsHaveExecutePermissions(fs: FileSystem, path: Path,
- statCache: Map[URI, FileStatus]): Boolean = {
- var current = path
- while (current != null) {
- //the subdirs in the path should have execute permissions for others
- if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) {
- return false
- }
- current = current.getParent()
- }
- return true
- }
-
- /**
- * Checks for a given path whether the Other permissions on it
- * imply the permission in the passed FsAction
- * @param fs
- * @param path
- * @param action
- * @param statCache
- * @return true if the path in the uri is visible to all, false otherwise
- */
- def checkPermissionOfOther(fs: FileSystem, path: Path,
- action: FsAction, statCache: Map[URI, FileStatus]): Boolean = {
- val status = getFileStatus(fs, path.toUri(), statCache)
- val perms = status.getPermission()
- val otherAction = perms.getOtherAction()
- if (otherAction.implies(action)) {
- return true
- }
- return false
- }
-
- /**
- * Checks to see if the given uri exists in the cache, if it does it
- * returns the existing FileStatus, otherwise it stats the uri, stores
- * it in the cache, and returns the FileStatus.
- * @param fs
- * @param uri
- * @param statCache
- * @return FileStatus
- */
- def getFileStatus(fs: FileSystem, uri: URI, statCache: Map[URI, FileStatus]): FileStatus = {
- val stat = statCache.get(uri) match {
- case Some(existstat) => existstat
- case None =>
- val newStat = fs.getFileStatus(new Path(uri))
- statCache.put(uri, newStat)
- newStat
- }
- return stat
- }
-}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
deleted file mode 100644
index 2ba2366ead..0000000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.conf.Configuration
-
-/**
- * Contains util methods to interact with Hadoop from spark.
- */
-class YarnSparkHadoopUtil extends SparkHadoopUtil {
-
- // Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true.
- override def isYarnMode(): Boolean = { true }
-
- // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
- // Always create a new config, dont reuse yarnConf.
- override def newConfiguration(): Configuration = new YarnConfiguration(new Configuration())
-
- // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
- override def addCredentials(conf: JobConf) {
- val jobCreds = conf.getCredentials()
- jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
- }
-}
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
deleted file mode 100644
index 522e0a9ad7..0000000000
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.apache.spark._
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.deploy.yarn.YarnAllocationHandler
-import org.apache.spark.scheduler.TaskSchedulerImpl
-import org.apache.spark.util.Utils
-
-/**
- *
- * This scheduler launch worker through Yarn - by call into Client to launch WorkerLauncher as AM.
- */
-private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) {
-
- def this(sc: SparkContext) = this(sc, new Configuration())
-
- // By default, rack is unknown
- override def getRackForHost(hostPort: String): Option[String] = {
- val host = Utils.parseHostPort(hostPort)._1
- val retval = YarnAllocationHandler.lookupRack(conf, host)
- if (retval != null) Some(retval) else None
- }
-
- override def postStartHook() {
-
- // The yarn application is running, but the worker might not yet ready
- // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
- Thread.sleep(2000L)
- logInfo("YarnClientClusterScheduler.postStartHook done")
- }
-}
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
deleted file mode 100644
index 4b69f5078b..0000000000
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
-import org.apache.spark.{SparkException, Logging, SparkContext}
-import org.apache.spark.deploy.yarn.{Client, ClientArguments}
-import org.apache.spark.scheduler.TaskSchedulerImpl
-
-private[spark] class YarnClientSchedulerBackend(
- scheduler: TaskSchedulerImpl,
- sc: SparkContext)
- extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
- with Logging {
-
- var client: Client = null
- var appId: ApplicationId = null
-
- override def start() {
- super.start()
-
- val defalutWorkerCores = "2"
- val defalutWorkerMemory = "512m"
- val defaultWorkerNumber = "1"
-
- val userJar = System.getenv("SPARK_YARN_APP_JAR")
- var workerCores = System.getenv("SPARK_WORKER_CORES")
- var workerMemory = System.getenv("SPARK_WORKER_MEMORY")
- var workerNumber = System.getenv("SPARK_WORKER_INSTANCES")
-
- if (userJar == null)
- throw new SparkException("env SPARK_YARN_APP_JAR is not set")
-
- if (workerCores == null)
- workerCores = defalutWorkerCores
- if (workerMemory == null)
- workerMemory = defalutWorkerMemory
- if (workerNumber == null)
- workerNumber = defaultWorkerNumber
-
- val driverHost = conf.get("spark.driver.host")
- val driverPort = conf.get("spark.driver.port")
- val hostport = driverHost + ":" + driverPort
-
- val argsArray = Array[String](
- "--class", "notused",
- "--jar", userJar,
- "--args", hostport,
- "--worker-memory", workerMemory,
- "--worker-cores", workerCores,
- "--num-workers", workerNumber,
- "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher"
- )
-
- val args = new ClientArguments(argsArray)
- client = new Client(args)
- appId = client.runApp()
- waitForApp()
- }
-
- def waitForApp() {
-
- // TODO : need a better way to find out whether the workers are ready or not
- // maybe by resource usage report?
- while(true) {
- val report = client.getApplicationReport(appId)
-
- logInfo("Application report from ASM: \n" +
- "\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
- "\t appStartTime: " + report.getStartTime() + "\n" +
- "\t yarnAppState: " + report.getYarnApplicationState() + "\n"
- )
-
- // Ready to go, or already gone.
- val state = report.getYarnApplicationState()
- if (state == YarnApplicationState.RUNNING) {
- return
- } else if (state == YarnApplicationState.FINISHED ||
- state == YarnApplicationState.FAILED ||
- state == YarnApplicationState.KILLED) {
- throw new SparkException("Yarn application already ended," +
- "might be killed or not able to launch application master.")
- }
-
- Thread.sleep(1000)
- }
- }
-
- override def stop() {
- super.stop()
- client.stop()
- logInfo("Stoped")
- }
-
-}
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
deleted file mode 100644
index 2d9fbcb400..0000000000
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.apache.hadoop.conf.Configuration
-
-import org.apache.spark._
-import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnAllocationHandler}
-import org.apache.spark.scheduler.TaskSchedulerImpl
-import org.apache.spark.util.Utils
-
-/**
- *
- * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of
- * ApplicationMaster, etc. is done
- */
-private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
- extends TaskSchedulerImpl(sc) {
-
- logInfo("Created YarnClusterScheduler")
-
- def this(sc: SparkContext) = this(sc, new Configuration())
-
- // Nothing else for now ... initialize application master : which needs sparkContext to determine how to allocate
- // Note that only the first creation of SparkContext influences (and ideally, there must be only one SparkContext, right ?)
- // Subsequent creations are ignored - since nodes are already allocated by then.
-
-
- // By default, rack is unknown
- override def getRackForHost(hostPort: String): Option[String] = {
- val host = Utils.parseHostPort(hostPort)._1
- val retval = YarnAllocationHandler.lookupRack(conf, host)
- if (retval != null) Some(retval) else None
- }
-
- override def postStartHook() {
- val sparkContextInitialized = ApplicationMaster.sparkContextInitialized(sc)
- if (sparkContextInitialized){
- // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
- Thread.sleep(3000L)
- }
- logInfo("YarnClusterScheduler.postStartHook done")
- }
-}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
deleted file mode 100644
index 2941356bc5..0000000000
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.URI
-
-import org.scalatest.FunSuite
-import org.scalatest.mock.MockitoSugar
-import org.mockito.Mockito.when
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileStatus
-import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.fs.permission.FsAction
-import org.apache.hadoop.yarn.api.records.LocalResource
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
-import org.apache.hadoop.yarn.api.records.LocalResourceType
-import org.apache.hadoop.yarn.util.{Records, ConverterUtils}
-
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.Map
-
-
-class ClientDistributedCacheManagerSuite extends FunSuite with MockitoSugar {
-
- class MockClientDistributedCacheManager extends ClientDistributedCacheManager {
- override def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]):
- LocalResourceVisibility = {
- return LocalResourceVisibility.PRIVATE
- }
- }
-
- test("test getFileStatus empty") {
- val distMgr = new ClientDistributedCacheManager()
- val fs = mock[FileSystem]
- val uri = new URI("/tmp/testing")
- when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus())
- val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
- val stat = distMgr.getFileStatus(fs, uri, statCache)
- assert(stat.getPath() === null)
- }
-
- test("test getFileStatus cached") {
- val distMgr = new ClientDistributedCacheManager()
- val fs = mock[FileSystem]
- val uri = new URI("/tmp/testing")
- val realFileStatus = new FileStatus(10, false, 1, 1024, 10, 10, null, "testOwner",
- null, new Path("/tmp/testing"))
- when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus())
- val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus](uri -> realFileStatus)
- val stat = distMgr.getFileStatus(fs, uri, statCache)
- assert(stat.getPath().toString() === "/tmp/testing")
- }
-
- test("test addResource") {
- val distMgr = new MockClientDistributedCacheManager()
- val fs = mock[FileSystem]
- val conf = new Configuration()
- val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
- val localResources = HashMap[String, LocalResource]()
- val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
- when(fs.getFileStatus(destPath)).thenReturn(new FileStatus())
-
- distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, "link",
- statCache, false)
- val resource = localResources("link")
- assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
- assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
- assert(resource.getTimestamp() === 0)
- assert(resource.getSize() === 0)
- assert(resource.getType() === LocalResourceType.FILE)
-
- val env = new HashMap[String, String]()
- distMgr.setDistFilesEnv(env)
- assert(env("SPARK_YARN_CACHE_FILES") === "file:/foo.invalid.com:8080/tmp/testing#link")
- assert(env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === "0")
- assert(env("SPARK_YARN_CACHE_FILES_FILE_SIZES") === "0")
- assert(env("SPARK_YARN_CACHE_FILES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name())
-
- distMgr.setDistArchivesEnv(env)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
-
- //add another one and verify both there and order correct
- val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
- null, new Path("/tmp/testing2"))
- val destPath2 = new Path("file:///foo.invalid.com:8080/tmp/testing2")
- when(fs.getFileStatus(destPath2)).thenReturn(realFileStatus)
- distMgr.addResource(fs, conf, destPath2, localResources, LocalResourceType.FILE, "link2",
- statCache, false)
- val resource2 = localResources("link2")
- assert(resource2.getVisibility() === LocalResourceVisibility.PRIVATE)
- assert(ConverterUtils.getPathFromYarnURL(resource2.getResource()) === destPath2)
- assert(resource2.getTimestamp() === 10)
- assert(resource2.getSize() === 20)
- assert(resource2.getType() === LocalResourceType.FILE)
-
- val env2 = new HashMap[String, String]()
- distMgr.setDistFilesEnv(env2)
- val timestamps = env2("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
- val files = env2("SPARK_YARN_CACHE_FILES").split(',')
- val sizes = env2("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
- val visibilities = env2("SPARK_YARN_CACHE_FILES_VISIBILITIES") .split(',')
- assert(files(0) === "file:/foo.invalid.com:8080/tmp/testing#link")
- assert(timestamps(0) === "0")
- assert(sizes(0) === "0")
- assert(visibilities(0) === LocalResourceVisibility.PRIVATE.name())
-
- assert(files(1) === "file:/foo.invalid.com:8080/tmp/testing2#link2")
- assert(timestamps(1) === "10")
- assert(sizes(1) === "20")
- assert(visibilities(1) === LocalResourceVisibility.PRIVATE.name())
- }
-
- test("test addResource link null") {
- val distMgr = new MockClientDistributedCacheManager()
- val fs = mock[FileSystem]
- val conf = new Configuration()
- val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
- val localResources = HashMap[String, LocalResource]()
- val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
- when(fs.getFileStatus(destPath)).thenReturn(new FileStatus())
-
- intercept[Exception] {
- distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, null,
- statCache, false)
- }
- assert(localResources.get("link") === None)
- assert(localResources.size === 0)
- }
-
- test("test addResource appmaster only") {
- val distMgr = new MockClientDistributedCacheManager()
- val fs = mock[FileSystem]
- val conf = new Configuration()
- val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
- val localResources = HashMap[String, LocalResource]()
- val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
- val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
- null, new Path("/tmp/testing"))
- when(fs.getFileStatus(destPath)).thenReturn(realFileStatus)
-
- distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link",
- statCache, true)
- val resource = localResources("link")
- assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
- assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
- assert(resource.getTimestamp() === 10)
- assert(resource.getSize() === 20)
- assert(resource.getType() === LocalResourceType.ARCHIVE)
-
- val env = new HashMap[String, String]()
- distMgr.setDistFilesEnv(env)
- assert(env.get("SPARK_YARN_CACHE_FILES") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None)
-
- distMgr.setDistArchivesEnv(env)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
- }
-
- test("test addResource archive") {
- val distMgr = new MockClientDistributedCacheManager()
- val fs = mock[FileSystem]
- val conf = new Configuration()
- val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
- val localResources = HashMap[String, LocalResource]()
- val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
- val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
- null, new Path("/tmp/testing"))
- when(fs.getFileStatus(destPath)).thenReturn(realFileStatus)
-
- distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link",
- statCache, false)
- val resource = localResources("link")
- assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
- assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
- assert(resource.getTimestamp() === 10)
- assert(resource.getSize() === 20)
- assert(resource.getType() === LocalResourceType.ARCHIVE)
-
- val env = new HashMap[String, String]()
-
- distMgr.setDistArchivesEnv(env)
- assert(env("SPARK_YARN_CACHE_ARCHIVES") === "file:/foo.invalid.com:8080/tmp/testing#link")
- assert(env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === "10")
- assert(env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === "20")
- assert(env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name())
-
- distMgr.setDistFilesEnv(env)
- assert(env.get("SPARK_YARN_CACHE_FILES") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None)
- }
-
-
-}
diff --git a/yarn/stable/pom.xml b/yarn/stable/pom.xml
new file mode 100644
index 0000000000..62fe3e2742
--- /dev/null
+++ b/yarn/stable/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>yarn-parent_2.10</artifactId>
+ <version>0.9.0-incubating-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-yarn_2.10</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Project YARN Stable API</name>
+
+</project>
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 7c32e0ab9b..7c32e0ab9b 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index a75066888c..a75066888c 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index 99b824e129..9b898b5829 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -56,17 +56,19 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
// This actor just working as a monitor to watch on Driver Actor.
class MonitorActor(driverUrl: String) extends Actor {
- var driver: ActorSelection = null
+ var driver: ActorSelection = _
override def preStart() {
logInfo("Listen to driver: " + driverUrl)
driver = context.actorSelection(driverUrl)
+ // Send a hello message thus the connection is actually established, thus we can monitor Lifecycle Events.
+ driver ! "Hello"
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
override def receive = {
case x: DisassociatedEvent =>
- logInfo("Driver terminated or disconnected! Shutting down.")
+ logInfo(s"Driver terminated or disconnected! Shutting down. $x")
driverClosed = true
}
}
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index 9f5523c4b9..9f5523c4b9 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 85ab08ef34..8a9a73f5b4 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -42,7 +42,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.util.{RackResolver, Records}
-object AllocationType extends Enumeration ("HOST", "RACK", "ANY") {
+object AllocationType extends Enumeration {
type AllocationType = Value
val HOST, RACK, ANY = Value
}