diff options
author | tgravescs <tgraves_cs@yahoo.com> | 2013-11-19 12:39:26 -0600 |
---|---|---|
committer | tgravescs <tgraves_cs@yahoo.com> | 2013-11-19 12:44:00 -0600 |
commit | 4093e9393aef95793f2d1d77fd0bbe80c8bb8d11 (patch) | |
tree | fc899a11dc2fd20ecadc947fd2e8fea44425d008 /yarn/src | |
parent | e2ebc3a9d8bca83bf842b134f2f056c1af0ad2be (diff) | |
download | spark-4093e9393aef95793f2d1d77fd0bbe80c8bb8d11.tar.gz spark-4093e9393aef95793f2d1d77fd0bbe80c8bb8d11.tar.bz2 spark-4093e9393aef95793f2d1d77fd0bbe80c8bb8d11.zip |
Impove Spark on Yarn Error handling
Diffstat (limited to 'yarn/src')
3 files changed, 58 insertions, 29 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 0e47bd7a10..89b00415da 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -52,7 +52,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES) private var isLastAMRetry: Boolean = true - + // default to numWorkers * 2, with minimum of 3 + private val maxNumWorkerFailures = System.getProperty("spark.yarn.max.worker.failures", + math.max(args.numWorkers * 2, 3).toString()).toInt def run() { // setup the directories so things go to yarn approved directories rather @@ -225,12 +227,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e if (null != sparkContext) { uiAddress = sparkContext.ui.appUIAddress - this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args, - sparkContext.preferredNodeLocationData) + this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, + appAttemptId, args, sparkContext.preferredNodeLocationData) } else { logWarning("Unable to retrieve sparkContext inspite of waiting for " + count * waitTime + - ", numTries = " + numTries) - this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args) + ", numTries = " + numTries) + this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, + appAttemptId, args) } } } finally { @@ -249,8 +252,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e while(yarnAllocator.getNumWorkersRunning < args.numWorkers && // If user thread exists, then quit ! userThread.isAlive) { - - this.yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0)) + if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) { + finishApplicationMaster(FinalApplicationStatus.FAILED, + "max number of worker failures reached") + } + yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0)) ApplicationMaster.incrementAllocatorLoop(1) Thread.sleep(100) } @@ -266,21 +272,27 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse. val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) - // must be <= timeoutInterval/ 2. - // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM. - // so atleast 1 minute or timeoutInterval / 10 - whichever is higher. - val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L)) + + // we want to be reasonably responsive without causing too many requests to RM. + val schedulerInterval = + System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong + + // must be <= timeoutInterval / 2. + val interval = math.min(timeoutInterval / 2, schedulerInterval) launchReporterThread(interval) } } - // TODO: We might want to extend this to allocate more containers in case they die ! private def launchReporterThread(_sleepTime: Long): Thread = { val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime val t = new Thread { override def run() { while (userThread.isAlive) { + if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) { + finishApplicationMaster(FinalApplicationStatus.FAILED, + "max number of worker failures reached") + } val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning if (missingWorkerCount > 0) { logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers") @@ -319,7 +331,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e } */ - def finishApplicationMaster(status: FinalApplicationStatus) { + def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: String = "") { synchronized { if (isFinished) { @@ -333,6 +345,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e .asInstanceOf[FinishApplicationMasterRequest] finishReq.setAppAttemptId(appAttemptId) finishReq.setFinishApplicationStatus(status) + finishReq.setDiagnostics(diagnostics) // set tracking url to empty since we don't have a history server finishReq.setTrackingUrl("") resourceManager.finishApplicationMaster(finishReq) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index c38bdd14ec..1078d5b826 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -60,6 +60,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644:Short) def run() { + validateArgs() + init(yarnConf) start() logClusterResourceDetails() @@ -84,6 +86,23 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl System.exit(0) } + def validateArgs() = { + Map((System.getenv("SPARK_JAR") == null) -> "Error: You must set SPARK_JAR environment variable!", + (args.userJar == null) -> "Error: You must specify a user jar!", + (args.userClass == null) -> "Error: You must specify a user class!", + (args.numWorkers <= 0) -> "Error: You must specify atleast 1 worker!", + (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> + ("Error: AM memory size must be greater then: " + YarnAllocationHandler.MEMORY_OVERHEAD), + (args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> + ("Error: Worker memory size must be greater then: " + YarnAllocationHandler.MEMORY_OVERHEAD.toString())) + .foreach { case(cond, errStr) => + if (cond) { + logError(errStr) + args.printUsageAndExit(1) + } + } + } + def getAppStagingDir(appId: ApplicationId): String = { SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR } @@ -97,7 +116,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl ", queueMaxCapacity=" + queueInfo.getMaximumCapacity + ", queueApplicationCount=" + queueInfo.getApplications.size + ", queueChildQueueCount=" + queueInfo.getChildQueues.size) } - def verifyClusterResources(app: GetNewApplicationResponse) = { val maxMem = app.getMaximumResourceCapability().getMemory() @@ -215,11 +233,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() - if (System.getenv("SPARK_JAR") == null || args.userJar == null) { - logError("Error: You must set SPARK_JAR environment variable and specify a user jar!") - System.exit(1) - } - Map(Client.SPARK_JAR -> System.getenv("SPARK_JAR"), Client.APP_JAR -> args.userJar, Client.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF")) .foreach { case(destName, _localPath) => @@ -334,7 +347,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " " - // Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out. // The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same // node, spark gc effects all other containers performance (which can also be other spark containers) @@ -360,11 +372,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl javaCommand = Environment.JAVA_HOME.$() + "/bin/java" } - if (args.userClass == null) { - logError("Error: You must specify a user class!") - System.exit(1) - } - val commands = List[String](javaCommand + " -server " + JAVA_OPTS + @@ -442,6 +449,7 @@ object Client { System.setProperty("SPARK_YARN_MODE", "true") val args = new ClientArguments(argStrings) + new Client(args).run } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 25da9aa917..507a0743fd 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -72,9 +72,11 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM // Used to generate a unique id per worker private val workerIdCounter = new AtomicInteger() private val lastResponseId = new AtomicInteger() + private val numWorkersFailed = new AtomicInteger() def getNumWorkersRunning: Int = numWorkersRunning.intValue + def getNumWorkersFailed: Int = numWorkersFailed.intValue def isResourceConstraintSatisfied(container: Container): Boolean = { container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) @@ -253,8 +255,16 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM else { // simply decrement count - next iteration of ReporterThread will take care of allocating ! numWorkersRunning.decrementAndGet() - logInfo("Container completed ? nodeId: " + containerId + ", state " + completedContainer.getState + - " httpaddress: " + completedContainer.getDiagnostics) + logInfo("Container completed not by us ? nodeId: " + containerId + ", state " + completedContainer.getState + + " httpaddress: " + completedContainer.getDiagnostics + " exit status: " + completedContainer.getExitStatus()) + + // Hadoop 2.2.X added a ContainerExitStatus we should switch to use + // there are some exit status' we shouldn't necessarily count against us, but for + // now I think its ok as none of the containers are expected to exit + if (completedContainer.getExitStatus() != 0) { + logInfo("Container marked as failed: " + containerId) + numWorkersFailed.incrementAndGet() + } } allocatedHostToContainersMap.synchronized { @@ -378,8 +388,6 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM val releasedContainerList = createReleasedContainerList() req.addAllReleases(releasedContainerList) - - if (numWorkers > 0) { logInfo("Allocating " + numWorkers + " worker containers with " + (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + " of memory each.") } |