diff options
author | Kousuke Saruta <sarutak@oss.nttdata.co.jp> | 2014-09-23 11:40:14 -0500 |
---|---|---|
committer | Thomas Graves <tgraves@apache.org> | 2014-09-23 11:40:14 -0500 |
commit | 11c10df825419372df61a8d23c51e8c3cc78047f (patch) | |
tree | 5a97988cc05ec1250a6ce1fd491852a9bdbc331a | |
parent | c4022dd52b4827323ff956632dc7623f546da937 (diff) | |
download | spark-11c10df825419372df61a8d23c51e8c3cc78047f.tar.gz spark-11c10df825419372df61a8d23c51e8c3cc78047f.tar.bz2 spark-11c10df825419372df61a8d23c51e8c3cc78047f.zip |
[SPARK-3304] [YARN] ApplicationMaster's Finish status is wrong when uncaught exception is thrown from ReporterThread
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>
Closes #2198 from sarutak/SPARK-3304 and squashes the following commits:
2696237 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3304
5b80363 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3304
4eb0a3e [Kousuke Saruta] Remoed the description about spark.yarn.scheduler.reporterThread.maxFailure
9741597 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3304
f7538d4 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3304
358ef8d [Kousuke Saruta] Merge branch 'SPARK-3304' of github.com:sarutak/spark into SPARK-3304
0d138c6 [Kousuke Saruta] Revert "tmp"
f8da10a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3304
b6e9879 [Kousuke Saruta] tmp
8d256ed [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3304
13b2652 [Kousuke Saruta] Merge branch 'SPARK-3304' of github.com:sarutak/spark into SPARK-3304
2711e15 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3304
c081f8e [Kousuke Saruta] Modified ApplicationMaster to handle exception in ReporterThread itself
0bbd3a6 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3304
a6982ad [Kousuke Saruta] Added ability handling uncaught exception thrown from Reporter thread
-rw-r--r-- | yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala | 66 |
1 files changed, 54 insertions, 12 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index cde5fff637..9050808157 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -17,7 +17,10 @@ package org.apache.spark.deploy.yarn +import scala.util.control.NonFatal + import java.io.IOException +import java.lang.reflect.InvocationTargetException import java.net.Socket import java.util.concurrent.atomic.AtomicReference @@ -55,6 +58,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, @volatile private var finished = false @volatile private var finalStatus = FinalApplicationStatus.UNDEFINED + @volatile private var userClassThread: Thread = _ private var reporterThread: Thread = _ private var allocator: YarnAllocator = _ @@ -221,18 +225,48 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, // must be <= expiryInterval / 2. val interval = math.max(0, math.min(expiryInterval / 2, schedulerInterval)) + // The number of failures in a row until Reporter thread give up + val reporterMaxFailures = sparkConf.getInt("spark.yarn.scheduler.reporterThread.maxFailures", 5) + val t = new Thread { override def run() { + var failureCount = 0 + while (!finished) { - checkNumExecutorsFailed() - if (!finished) { - logDebug("Sending progress") - allocator.allocateResources() - try { - Thread.sleep(interval) - } catch { - case e: InterruptedException => + try { + checkNumExecutorsFailed() + if (!finished) { + logDebug("Sending progress") + allocator.allocateResources() } + failureCount = 0 + } catch { + case e: Throwable => { + failureCount += 1 + if (!NonFatal(e) || failureCount >= reporterMaxFailures) { + logError("Exception was thrown from Reporter thread.", e) + finish(FinalApplicationStatus.FAILED, "Exception was thrown" + + s"${failureCount} time(s) from Reporter thread.") + + /** + * If exception is thrown from ReporterThread, + * interrupt user class to stop. + * Without this interrupting, if exception is + * thrown before allocating enough executors, + * YarnClusterScheduler waits until timeout even though + * we cannot allocate executors. + */ + logInfo("Interrupting user class to stop.") + userClassThread.interrupt + } else { + logWarning(s"Reporter thread fails ${failureCount} time(s) in a row.", e) + } + } + } + try { + Thread.sleep(interval) + } catch { + case e: InterruptedException => } } } @@ -355,7 +389,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, val mainMethod = Class.forName(args.userClass, false, Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]]) - val t = new Thread { + userClassThread = new Thread { override def run() { var status = FinalApplicationStatus.FAILED try { @@ -366,15 +400,23 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, // Some apps have "System.exit(0)" at the end. The user thread will stop here unless // it has an uncaught exception thrown out. It needs a shutdown hook to set SUCCEEDED. status = FinalApplicationStatus.SUCCEEDED + } catch { + case e: InvocationTargetException => { + e.getCause match { + case _: InterruptedException => { + // Reporter thread can interrupt to stop user class + } + } + } } finally { logDebug("Finishing main") } finalStatus = status } } - t.setName("Driver") - t.start() - t + userClassThread.setName("Driver") + userClassThread.start() + userClassThread } // Actor used to monitor the driver when running in client deploy mode. |