From f80dcf2aeef762ca370e91d2c7d6e4f7894c3cd8 Mon Sep 17 00:00:00 2001 From: Nan Zhu Date: Fri, 24 Oct 2014 13:46:45 -0700 Subject: [SPARK-4067] refactor ExecutorUncaughtExceptionHandler https://issues.apache.org/jira/browse/SPARK-4067 currently , we call Utils.tryOrExit everywhere AppClient Executor TaskSchedulerImpl It makes the name of ExecutorUncaughtExceptionHandler unfit to the real case.... Author: Nan Zhu Author: Nan Zhu Closes #2913 from CodingCat/SPARK-4067 and squashes the following commits: 035ee3d [Nan Zhu] make RAT happy e62e416 [Nan Zhu] add some general Exit code a10b63f [Nan Zhu] refactor --- .../scala/org/apache/spark/executor/Executor.scala | 6 +-- .../apache/spark/executor/ExecutorExitCode.scala | 12 +---- .../ExecutorUncaughtExceptionHandler.scala | 53 ---------------------- .../org/apache/spark/util/SparkExitCode.scala | 32 +++++++++++++ .../spark/util/SparkUncaughtExceptionHandler.scala | 52 +++++++++++++++++++++ .../main/scala/org/apache/spark/util/Utils.scala | 4 +- 6 files changed, 91 insertions(+), 68 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala create mode 100644 core/src/main/scala/org/apache/spark/util/SparkExitCode.scala create mode 100644 core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala (limited to 'core/src/main/scala/org') diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 70a46c75f4..2889f59e33 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -33,7 +33,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler._ import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.{StorageLevel, TaskResultBlockId} -import org.apache.spark.util.{AkkaUtils, Utils} +import org.apache.spark.util.{SparkUncaughtExceptionHandler, AkkaUtils, Utils} /** * Spark executor used with Mesos, YARN, and the standalone scheduler. @@ -72,7 +72,7 @@ private[spark] class Executor( // Setup an uncaught exception handler for non-local mode. // Make any thread terminations due to uncaught exceptions kill the entire // executor process to avoid surprising stalls. - Thread.setDefaultUncaughtExceptionHandler(ExecutorUncaughtExceptionHandler) + Thread.setDefaultUncaughtExceptionHandler(SparkUncaughtExceptionHandler) } val executorSource = new ExecutorSource(this, executorId) @@ -258,7 +258,7 @@ private[spark] class Executor( // Don't forcibly exit unless the exception was inherently fatal, to avoid // stopping other tasks unnecessarily. if (Utils.isFatalError(t)) { - ExecutorUncaughtExceptionHandler.uncaughtException(t) + SparkUncaughtExceptionHandler.uncaughtException(t) } } } finally { diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala index 38be2c58b3..52862ae0ca 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala @@ -17,6 +17,8 @@ package org.apache.spark.executor +import org.apache.spark.util.SparkExitCode._ + /** * These are exit codes that executors should use to provide the master with information about * executor failures assuming that cluster management framework can capture the exit codes (but @@ -27,16 +29,6 @@ package org.apache.spark.executor */ private[spark] object ExecutorExitCode { - /** The default uncaught exception handler was reached. */ - val UNCAUGHT_EXCEPTION = 50 - - /** The default uncaught exception handler was called and an exception was encountered while - logging the exception. */ - val UNCAUGHT_EXCEPTION_TWICE = 51 - - /** The default uncaught exception handler was reached, and the uncaught exception was an - OutOfMemoryError. */ - val OOM = 52 /** DiskStore failed to create a local temporary directory after many attempts. */ val DISK_STORE_FAILED_TO_CREATE_DIR = 53 diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala deleted file mode 100644 index b0e984c039..0000000000 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.executor - -import org.apache.spark.Logging -import org.apache.spark.util.Utils - -/** - * The default uncaught exception handler for Executors terminates the whole process, to avoid - * getting into a bad state indefinitely. Since Executors are relatively lightweight, it's better - * to fail fast when things go wrong. - */ -private[spark] object ExecutorUncaughtExceptionHandler - extends Thread.UncaughtExceptionHandler with Logging { - - override def uncaughtException(thread: Thread, exception: Throwable) { - try { - logError("Uncaught exception in thread " + thread, exception) - - // We may have been called from a shutdown hook. If so, we must not call System.exit(). - // (If we do, we will deadlock.) - if (!Utils.inShutdown()) { - if (exception.isInstanceOf[OutOfMemoryError]) { - System.exit(ExecutorExitCode.OOM) - } else { - System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION) - } - } - } catch { - case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM) - case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE) - } - } - - def uncaughtException(exception: Throwable) { - uncaughtException(Thread.currentThread(), exception) - } -} diff --git a/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala b/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala new file mode 100644 index 0000000000..c93b1cca9f --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +private[spark] object SparkExitCode { + /** The default uncaught exception handler was reached. */ + val UNCAUGHT_EXCEPTION = 50 + + /** The default uncaught exception handler was called and an exception was encountered while + logging the exception. */ + val UNCAUGHT_EXCEPTION_TWICE = 51 + + /** The default uncaught exception handler was reached, and the uncaught exception was an + OutOfMemoryError. */ + val OOM = 52 + +} diff --git a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala new file mode 100644 index 0000000000..ad3db1fbb5 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import org.apache.spark.Logging + +/** + * The default uncaught exception handler for Executors terminates the whole process, to avoid + * getting into a bad state indefinitely. Since Executors are relatively lightweight, it's better + * to fail fast when things go wrong. + */ +private[spark] object SparkUncaughtExceptionHandler + extends Thread.UncaughtExceptionHandler with Logging { + + override def uncaughtException(thread: Thread, exception: Throwable) { + try { + logError("Uncaught exception in thread " + thread, exception) + + // We may have been called from a shutdown hook. If so, we must not call System.exit(). + // (If we do, we will deadlock.) + if (!Utils.inShutdown()) { + if (exception.isInstanceOf[OutOfMemoryError]) { + System.exit(SparkExitCode.OOM) + } else { + System.exit(SparkExitCode.UNCAUGHT_EXCEPTION) + } + } + } catch { + case oom: OutOfMemoryError => Runtime.getRuntime.halt(SparkExitCode.OOM) + case t: Throwable => Runtime.getRuntime.halt(SparkExitCode.UNCAUGHT_EXCEPTION_TWICE) + } + } + + def uncaughtException(exception: Throwable) { + uncaughtException(Thread.currentThread(), exception) + } +} diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index ccbddd985a..65bdbaae65 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -43,7 +43,7 @@ import org.json4s._ import tachyon.client.{TachyonFile,TachyonFS} import org.apache.spark._ -import org.apache.spark.executor.ExecutorUncaughtExceptionHandler +import org.apache.spark.util.SparkUncaughtExceptionHandler import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} /** CallSite represents a place in user code. It can have a short and a long form. */ @@ -965,7 +965,7 @@ private[spark] object Utils extends Logging { block } catch { case e: ControlThrowable => throw e - case t: Throwable => ExecutorUncaughtExceptionHandler.uncaughtException(t) + case t: Throwable => SparkUncaughtExceptionHandler.uncaughtException(t) } } -- cgit v1.2.3