diff options
Diffstat (limited to 'yarn')
3 files changed, 58 insertions, 29 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 997a6dc1ec..a7baf0c36c 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -54,7 +54,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES) private var isLastAMRetry: Boolean = true - + // default to numWorkers * 2, with minimum of 3 + private val maxNumWorkerFailures = System.getProperty("spark.yarn.max.worker.failures", + math.max(args.numWorkers * 2, 3).toString()).toInt def run() { // setup the directories so things go to yarn approved directories rather @@ -227,12 +229,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e if (null != sparkContext) { uiAddress = sparkContext.ui.appUIAddress - this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args, - sparkContext.preferredNodeLocationData) + this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, + appAttemptId, args, sparkContext.preferredNodeLocationData) } else { logWarning("Unable to retrieve sparkContext inspite of waiting for " + count * waitTime + - ", numTries = " + numTries) - this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args) + ", numTries = " + numTries) + this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, + appAttemptId, args) } } } finally { @@ -251,8 +254,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e while(yarnAllocator.getNumWorkersRunning < args.numWorkers && // If user thread exists, then quit ! userThread.isAlive) { - - this.yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0)) + if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) { + finishApplicationMaster(FinalApplicationStatus.FAILED, + "max number of worker failures reached") + } + yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0)) ApplicationMaster.incrementAllocatorLoop(1) Thread.sleep(100) } @@ -268,21 +274,27 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse. val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) - // must be <= timeoutInterval/ 2. - // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM. - // so atleast 1 minute or timeoutInterval / 10 - whichever is higher. - val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L)) + + // we want to be reasonably responsive without causing too many requests to RM. + val schedulerInterval = + System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong + + // must be <= timeoutInterval / 2. + val interval = math.min(timeoutInterval / 2, schedulerInterval) launchReporterThread(interval) } } - // TODO: We might want to extend this to allocate more containers in case they die ! private def launchReporterThread(_sleepTime: Long): Thread = { val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime val t = new Thread { override def run() { while (userThread.isAlive) { + if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) { + finishApplicationMaster(FinalApplicationStatus.FAILED, + "max number of worker failures reached") + } val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning if (missingWorkerCount > 0) { logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers") @@ -321,7 +333,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e } */ - def finishApplicationMaster(status: FinalApplicationStatus) { + def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: String = "") { synchronized { if (isFinished) { @@ -335,6 +347,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e .asInstanceOf[FinishApplicationMasterRequest] finishReq.setAppAttemptId(appAttemptId) finishReq.setFinishApplicationStatus(status) + finishReq.setDiagnostics(diagnostics) // set tracking url to empty since we don't have a history server finishReq.setTrackingUrl("") resourceManager.finishApplicationMaster(finishReq) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 49a8cfde81..94e353af2e 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -57,6 +57,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644:Short) def run() { + validateArgs() + init(yarnConf) start() logClusterResourceDetails() @@ -81,6 +83,23 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl System.exit(0) } + def validateArgs() = { + Map((System.getenv("SPARK_JAR") == null) -> "Error: You must set SPARK_JAR environment variable!", + (args.userJar == null) -> "Error: You must specify a user jar!", + (args.userClass == null) -> "Error: You must specify a user class!", + (args.numWorkers <= 0) -> "Error: You must specify atleast 1 worker!", + (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> + ("Error: AM memory size must be greater then: " + YarnAllocationHandler.MEMORY_OVERHEAD), + (args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> + ("Error: Worker memory size must be greater then: " + YarnAllocationHandler.MEMORY_OVERHEAD.toString())) + .foreach { case(cond, errStr) => + if (cond) { + logError(errStr) + args.printUsageAndExit(1) + } + } + } + def getAppStagingDir(appId: ApplicationId): String = { SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR } @@ -94,7 +113,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl ", queueMaxCapacity=" + queueInfo.getMaximumCapacity + ", queueApplicationCount=" + queueInfo.getApplications.size + ", queueChildQueueCount=" + queueInfo.getChildQueues.size) } - def verifyClusterResources(app: GetNewApplicationResponse) = { val maxMem = app.getMaximumResourceCapability().getMemory() @@ -212,11 +230,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() - if (System.getenv("SPARK_JAR") == null || args.userJar == null) { - logError("Error: You must set SPARK_JAR environment variable and specify a user jar!") - System.exit(1) - } - Map(Client.SPARK_JAR -> System.getenv("SPARK_JAR"), Client.APP_JAR -> args.userJar, Client.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF")) .foreach { case(destName, _localPath) => @@ -331,7 +344,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " " - // Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out. // The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same // node, spark gc effects all other containers performance (which can also be other spark containers) @@ -357,11 +369,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl javaCommand = Environment.JAVA_HOME.$() + "/bin/java" } - if (args.userClass == null) { - logError("Error: You must specify a user class!") - System.exit(1) - } - val commands = List[String](javaCommand + " -server " + JAVA_OPTS + @@ -439,6 +446,7 @@ object Client { System.setProperty("SPARK_YARN_MODE", "true") val args = new ClientArguments(argStrings) + new Client(args).run } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 25da9aa917..507a0743fd 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -72,9 +72,11 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM // Used to generate a unique id per worker private val workerIdCounter = new AtomicInteger() private val lastResponseId = new AtomicInteger() + private val numWorkersFailed = new AtomicInteger() def getNumWorkersRunning: Int = numWorkersRunning.intValue + def getNumWorkersFailed: Int = numWorkersFailed.intValue def isResourceConstraintSatisfied(container: Container): Boolean = { container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) @@ -253,8 +255,16 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM else { // simply decrement count - next iteration of ReporterThread will take care of allocating ! numWorkersRunning.decrementAndGet() - logInfo("Container completed ? nodeId: " + containerId + ", state " + completedContainer.getState + - " httpaddress: " + completedContainer.getDiagnostics) + logInfo("Container completed not by us ? nodeId: " + containerId + ", state " + completedContainer.getState + + " httpaddress: " + completedContainer.getDiagnostics + " exit status: " + completedContainer.getExitStatus()) + + // Hadoop 2.2.X added a ContainerExitStatus we should switch to use + // there are some exit status' we shouldn't necessarily count against us, but for + // now I think its ok as none of the containers are expected to exit + if (completedContainer.getExitStatus() != 0) { + logInfo("Container marked as failed: " + containerId) + numWorkersFailed.incrementAndGet() + } } allocatedHostToContainersMap.synchronized { @@ -378,8 +388,6 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM val releasedContainerList = createReleasedContainerList() req.addAllReleases(releasedContainerList) - - if (numWorkers > 0) { logInfo("Allocating " + numWorkers + " worker containers with " + (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + " of memory each.") } |