aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorReynold Xin <reynoldx@gmail.com>2013-09-20 15:03:55 -0700
committerReynold Xin <reynoldx@gmail.com>2013-09-20 15:03:55 -0700
commit119de80294bd0cb82855bd1982c5371b661b6fd5 (patch)
tree7038c25bfab1d4501fc2df7bd011117c7b86f8c3 /yarn
parenta106ed8b97e707b36818c11d1d7211fa28636178 (diff)
parentcd7222c3dd2211ce790fa52110db911b862bb63b (diff)
downloadspark-119de80294bd0cb82855bd1982c5371b661b6fd5.tar.gz
spark-119de80294bd0cb82855bd1982c5371b661b6fd5.tar.bz2
spark-119de80294bd0cb82855bd1982c5371b661b6fd5.zip
Merge branch 'master' of github.com:mesos/spark
Diffstat (limited to 'yarn')
-rw-r--r--yarn/pom.xml66
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala78
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala2
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala5
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala2
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala3
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala13
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala3
8 files changed, 107 insertions, 65 deletions
diff --git a/yarn/pom.xml b/yarn/pom.xml
index 654b5bcd2d..21b650d1ea 100644
--- a/yarn/pom.xml
+++ b/yarn/pom.xml
@@ -30,6 +30,39 @@
<name>Spark Project YARN Support</name>
<url>http://spark.incubator.apache.org/</url>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${yarn.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-ipc</artifactId>
+ </dependency>
+ </dependencies>
+
<build>
<outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory>
@@ -75,37 +108,4 @@
</plugin>
</plugins>
</build>
-
- <profiles>
- <profile>
- <id>hadoop2-yarn</id>
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-common</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-client</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-ipc</artifactId>
- </dependency>
- </dependencies>
- </profile>
- </profiles>
</project>
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 139a977a03..858b58d338 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -29,7 +29,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import scala.collection.JavaConversions._
-import org.apache.spark.{SparkContext, Logging, Utils}
+import org.apache.spark.{SparkContext, Logging}
+import org.apache.spark.util.Utils
import org.apache.hadoop.security.UserGroupInformation
import java.security.PrivilegedExceptionAction
@@ -45,6 +46,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
private var yarnAllocator: YarnAllocationHandler = null
private var isFinished:Boolean = false
+ private var uiAddress: String = ""
+
def run() {
// setup the directories so things go to yarn approved directories rather
@@ -53,27 +56,25 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
appAttemptId = getApplicationAttemptId()
resourceManager = registerWithResourceManager()
- val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
- // Compute number of threads for akka
- val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
+ // Workaround until hadoop moves to something which has
+ // https://issues.apache.org/jira/browse/HADOOP-8406 - fixed in (2.0.2-alpha but no 0.23 line)
+ // ignore result
+ // This does not, unfortunately, always work reliably ... but alleviates the bug a lot of times
+ // Hence args.workerCores = numCore disabled above. Any better option ?
- if (minimumMemory > 0) {
- val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
- val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
+ // Compute number of threads for akka
+ //val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
+ //if (minimumMemory > 0) {
+ // val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+ // val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
- if (numCore > 0) {
+ // if (numCore > 0) {
// do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
// TODO: Uncomment when hadoop is on a version which has this fixed.
// args.workerCores = numCore
- }
- }
-
- // Workaround until hadoop moves to something which has
- // https://issues.apache.org/jira/browse/HADOOP-8406
- // ignore result
- // This does not, unfortunately, always work reliably ... but alleviates the bug a lot of times
- // Hence args.workerCores = numCore disabled above. Any better option ?
+ // }
+ //}
// org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(conf)
ApplicationMaster.register(this)
@@ -83,6 +84,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// This a bit hacky, but we need to wait until the spark.driver.port property has
// been set by the Thread executing the user class.
waitForSparkMaster()
+
+ waitForSparkContextInitialized()
+
+ // do this after spark master is up and SparkContext is created so that we can register UI Url
+ val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
// Allocate all containers
allocateWorkers()
@@ -134,8 +140,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// Users can then monitor stderr/stdout on that node if required.
appMasterRequest.setHost(Utils.localHostName())
appMasterRequest.setRpcPort(0)
- // What do we provide here ? Might make sense to expose something sensible later ?
- appMasterRequest.setTrackingUrl("")
+ appMasterRequest.setTrackingUrl(uiAddress)
return resourceManager.registerApplicationMaster(appMasterRequest)
}
@@ -143,7 +148,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
logInfo("Waiting for spark driver to be reachable.")
var driverUp = false
var tries = 0
- while(!driverUp && tries < 10) {
+ val numTries = System.getProperty("spark.yarn.applicationMaster.waitTries", "10").toInt
+ while(!driverUp && tries < numTries) {
val driverHost = System.getProperty("spark.driver.host")
val driverPort = System.getProperty("spark.driver.port")
try {
@@ -189,24 +195,44 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
return t
}
- private def allocateWorkers() {
+ // this need to happen before allocateWorkers
+ private def waitForSparkContextInitialized() {
logInfo("Waiting for spark context initialization")
-
try {
var sparkContext: SparkContext = null
ApplicationMaster.sparkContextRef.synchronized {
var count = 0
- while (ApplicationMaster.sparkContextRef.get() == null && count < 10) {
+ val waitTime = 10000L
+ val numTries = System.getProperty("spark.yarn.ApplicationMaster.waitTries", "10").toInt
+ while (ApplicationMaster.sparkContextRef.get() == null && count < numTries) {
logInfo("Waiting for spark context initialization ... " + count)
count = count + 1
- ApplicationMaster.sparkContextRef.wait(10000L)
+ ApplicationMaster.sparkContextRef.wait(waitTime)
}
sparkContext = ApplicationMaster.sparkContextRef.get()
- assert(sparkContext != null)
- this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args, sparkContext.preferredNodeLocationData)
+ assert(sparkContext != null || count >= numTries)
+
+ if (null != sparkContext) {
+ uiAddress = sparkContext.ui.appUIAddress
+ this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args,
+ sparkContext.preferredNodeLocationData)
+ } else {
+ logWarning("Unable to retrieve sparkContext inspite of waiting for " + count * waitTime +
+ ", numTries = " + numTries)
+ this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args)
+ }
}
+ } finally {
+ // in case of exceptions, etc - ensure that count is atleast ALLOCATOR_LOOP_WAIT_COUNT :
+ // so that the loop (in ApplicationMaster.sparkContextInitialized) breaks
+ ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
+ }
+ }
+
+ private def allocateWorkers() {
+ try {
logInfo("Allocating " + args.numWorkers + " workers.")
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
@@ -298,6 +324,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
.asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(appAttemptId)
finishReq.setFinishApplicationStatus(status)
+ // set tracking url to empty since we don't have a history server
+ finishReq.setTrackingUrl("")
resourceManager.finishApplicationMaster(finishReq)
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index f47e23b63f..f76a5ddd39 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -80,7 +80,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
System.err.println("Unknown/unsupported param " + unknownParam)
}
System.err.println(
- "Usage: spark.deploy.yarn.ApplicationMaster [options] \n" +
+ "Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options] \n" +
"Options:\n" +
" --jar JAR_PATH Path to your application's JAR file (required)\n" +
" --class CLASS_NAME Name of your application's main class (required)\n" +
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 48e737ed79..844c707834 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -33,7 +33,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
-import org.apache.spark.{Logging, Utils}
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
import org.apache.hadoop.yarn.util.{Apps, Records, ConverterUtils}
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.spark.deploy.SparkHadoopUtil
@@ -254,7 +255,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val commands = List[String](javaCommand +
" -server " +
JAVA_OPTS +
- " spark.deploy.yarn.ApplicationMaster" +
+ " org.apache.spark.deploy.yarn.ApplicationMaster" +
" --class " + args.userClass +
" --jar " + args.userJar +
userArgsToString(args) +
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 6cbfadc23b..cd651904d2 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -98,7 +98,7 @@ class ClientArguments(val args: Array[String]) {
System.err.println("Unknown/unsupported param " + unknownParam)
}
System.err.println(
- "Usage: spark.deploy.yarn.Client [options] \n" +
+ "Usage: org.apache.spark.deploy.yarn.Client [options] \n" +
"Options:\n" +
" --jar JAR_PATH Path to your application's JAR file (required)\n" +
" --class CLASS_NAME Name of your application's main class (required)\n" +
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index 72dcf7178e..6229167cb4 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@ -37,7 +37,8 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
-import org.apache.spark.{Logging, Utils}
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
class WorkerRunnable(container: Container, conf: Configuration, masterAddress: String,
slaveId: String, hostname: String, workerMemory: Int, workerCores: Int)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 26ff214e12..6d6ef149cc 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -17,7 +17,8 @@
package org.apache.spark.deploy.yarn
-import org.apache.spark.{Logging, Utils}
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
import org.apache.spark.scheduler.SplitInfo
import scala.collection
import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId, ContainerId, Priority, Resource, ResourceRequest, ContainerStatus, Container}
@@ -479,6 +480,15 @@ object YarnAllocationHandler {
private val hostToRack = new ConcurrentHashMap[String, String]()
private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
+
+ def newAllocator(conf: Configuration,
+ resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId,
+ args: ApplicationMasterArguments): YarnAllocationHandler = {
+
+ new YarnAllocationHandler(conf, resourceManager, appAttemptId, args.numWorkers,
+ args.workerMemory, args.workerCores, Map[String, Int](), Map[String, Int]())
+ }
+
def newAllocator(conf: Configuration,
resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId,
args: ApplicationMasterArguments,
@@ -486,7 +496,6 @@ object YarnAllocationHandler {
val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
-
new YarnAllocationHandler(conf, resourceManager, appAttemptId, args.numWorkers,
args.workerMemory, args.workerCores, hostToCount, rackToCount)
}
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
index 3828ddfc4f..29b3f22e13 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -19,6 +19,7 @@ package org.apache.spark.scheduler.cluster
import org.apache.spark._
import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnAllocationHandler}
+import org.apache.spark.util.Utils
import org.apache.hadoop.conf.Configuration
/**
@@ -27,6 +28,8 @@ import org.apache.hadoop.conf.Configuration
*/
private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) extends ClusterScheduler(sc) {
+ logInfo("Created YarnClusterScheduler")
+
def this(sc: SparkContext) = this(sc, new Configuration())
// Nothing else for now ... initialize application master : which needs sparkContext to determine how to allocate