aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorKousuke Saruta <sarutak@oss.nttdata.co.jp>2014-09-23 11:40:14 -0500
committerThomas Graves <tgraves@apache.org>2014-09-23 11:40:14 -0500
commit11c10df825419372df61a8d23c51e8c3cc78047f (patch)
tree5a97988cc05ec1250a6ce1fd491852a9bdbc331a /yarn
parentc4022dd52b4827323ff956632dc7623f546da937 (diff)
downloadspark-11c10df825419372df61a8d23c51e8c3cc78047f.tar.gz
spark-11c10df825419372df61a8d23c51e8c3cc78047f.tar.bz2
spark-11c10df825419372df61a8d23c51e8c3cc78047f.zip
[SPARK-3304] [YARN] ApplicationMaster's Finish status is wrong when uncaught exception is thrown from ReporterThread
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp> Closes #2198 from sarutak/SPARK-3304 and squashes the following commits: 2696237 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3304 5b80363 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3304 4eb0a3e [Kousuke Saruta] Remoed the description about spark.yarn.scheduler.reporterThread.maxFailure 9741597 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3304 f7538d4 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3304 358ef8d [Kousuke Saruta] Merge branch 'SPARK-3304' of github.com:sarutak/spark into SPARK-3304 0d138c6 [Kousuke Saruta] Revert "tmp" f8da10a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3304 b6e9879 [Kousuke Saruta] tmp 8d256ed [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3304 13b2652 [Kousuke Saruta] Merge branch 'SPARK-3304' of github.com:sarutak/spark into SPARK-3304 2711e15 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3304 c081f8e [Kousuke Saruta] Modified ApplicationMaster to handle exception in ReporterThread itself 0bbd3a6 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3304 a6982ad [Kousuke Saruta] Added ability handling uncaught exception thrown from Reporter thread
Diffstat (limited to 'yarn')
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala66
1 files changed, 54 insertions, 12 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index cde5fff637..9050808157 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -17,7 +17,10 @@
package org.apache.spark.deploy.yarn
+import scala.util.control.NonFatal
+
import java.io.IOException
+import java.lang.reflect.InvocationTargetException
import java.net.Socket
import java.util.concurrent.atomic.AtomicReference
@@ -55,6 +58,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
@volatile private var finished = false
@volatile private var finalStatus = FinalApplicationStatus.UNDEFINED
+ @volatile private var userClassThread: Thread = _
private var reporterThread: Thread = _
private var allocator: YarnAllocator = _
@@ -221,18 +225,48 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
// must be <= expiryInterval / 2.
val interval = math.max(0, math.min(expiryInterval / 2, schedulerInterval))
+ // The number of failures in a row until Reporter thread give up
+ val reporterMaxFailures = sparkConf.getInt("spark.yarn.scheduler.reporterThread.maxFailures", 5)
+
val t = new Thread {
override def run() {
+ var failureCount = 0
+
while (!finished) {
- checkNumExecutorsFailed()
- if (!finished) {
- logDebug("Sending progress")
- allocator.allocateResources()
- try {
- Thread.sleep(interval)
- } catch {
- case e: InterruptedException =>
+ try {
+ checkNumExecutorsFailed()
+ if (!finished) {
+ logDebug("Sending progress")
+ allocator.allocateResources()
}
+ failureCount = 0
+ } catch {
+ case e: Throwable => {
+ failureCount += 1
+ if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
+ logError("Exception was thrown from Reporter thread.", e)
+ finish(FinalApplicationStatus.FAILED, "Exception was thrown" +
+ s"${failureCount} time(s) from Reporter thread.")
+
+ /**
+ * If exception is thrown from ReporterThread,
+ * interrupt user class to stop.
+ * Without this interrupting, if exception is
+ * thrown before allocating enough executors,
+ * YarnClusterScheduler waits until timeout even though
+ * we cannot allocate executors.
+ */
+ logInfo("Interrupting user class to stop.")
+ userClassThread.interrupt
+ } else {
+ logWarning(s"Reporter thread fails ${failureCount} time(s) in a row.", e)
+ }
+ }
+ }
+ try {
+ Thread.sleep(interval)
+ } catch {
+ case e: InterruptedException =>
}
}
}
@@ -355,7 +389,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
val mainMethod = Class.forName(args.userClass, false,
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
- val t = new Thread {
+ userClassThread = new Thread {
override def run() {
var status = FinalApplicationStatus.FAILED
try {
@@ -366,15 +400,23 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
// Some apps have "System.exit(0)" at the end. The user thread will stop here unless
// it has an uncaught exception thrown out. It needs a shutdown hook to set SUCCEEDED.
status = FinalApplicationStatus.SUCCEEDED
+ } catch {
+ case e: InvocationTargetException => {
+ e.getCause match {
+ case _: InterruptedException => {
+ // Reporter thread can interrupt to stop user class
+ }
+ }
+ }
} finally {
logDebug("Finishing main")
}
finalStatus = status
}
}
- t.setName("Driver")
- t.start()
- t
+ userClassThread.setName("Driver")
+ userClassThread.start()
+ userClassThread
}
// Actor used to monitor the driver when running in client deploy mode.