aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2013-12-02 11:42:53 -0800
committerPrashant Sharma <prashant.s@imaginea.com>2013-12-03 11:05:12 +0530
commit0f24576c08a361f323b7ad9babfd5d8431d57df0 (patch)
treeb1e299b28fbc0d0f89cb8b366bc4e5504801b4ab /core
parent5b11028a0479623f41e95a41825a9bdfc944b323 (diff)
downloadspark-0f24576c08a361f323b7ad9babfd5d8431d57df0.tar.gz
spark-0f24576c08a361f323b7ad9babfd5d8431d57df0.tar.bz2
spark-0f24576c08a361f323b7ad9babfd5d8431d57df0.zip
Cleanup and documentation of SparkActorSystem
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala114
1 files changed, 29 insertions, 85 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala b/core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala
index 461e7ab08f..d329063e43 100644
--- a/core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala
+++ b/core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala
@@ -2,111 +2,55 @@
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
+// Must be in akka.actor package as ActorSystemImpl is protected[akka].
package akka.actor
+import scala.util.control.{ControlThrowable, NonFatal}
+
import com.typesafe.config.Config
-import akka.util._
-import scala.util.control.{NonFatal, ControlThrowable}
/**
- * An actorSystem specific to spark. It has an additional feature of letting spark tolerate
- * fatal exceptions.
+ * An ActorSystem specific to Spark. Based off of [[akka.actor.ActorSystem]].
+ * The only change from the default system is that we do not shut down the ActorSystem
+ * in the event of a fatal exception. This is necessary as Spark is allowed to recover
+ * from fatal exceptions (see [[org.apache.spark.executor.Executor]]).
*/
object SparkActorSystem {
-
- def apply(name: String, config: Config): ActorSystem = apply(name, config, findClassLoader())
+ def apply(name: String, config: Config): ActorSystem =
+ apply(name, config, ActorSystem.findClassLoader())
def apply(name: String, config: Config, classLoader: ClassLoader): ActorSystem =
new SparkActorSystemImpl(name, config, classLoader).start()
-
- /**
- * INTERNAL API
- */
- private[akka] def findClassLoader(): ClassLoader = {
- def findCaller(get: Int ⇒ Class[_]): ClassLoader =
- Iterator.from(2 /*is the magic number, promise*/).map(get) dropWhile {
- c ⇒
- c != null &&
- (c.getName.startsWith("akka.actor.ActorSystem") ||
- c.getName.startsWith("scala.Option") ||
- c.getName.startsWith("scala.collection.Iterator") ||
- c.getName.startsWith("akka.util.Reflect"))
- } next() match {
- case null ⇒ getClass.getClassLoader
- case c ⇒ c.getClassLoader
- }
-
- Option(Thread.currentThread.getContextClassLoader) orElse
- (Reflect.getCallerClass map findCaller) getOrElse
- getClass.getClassLoader
- }
}
-private[akka] class SparkActorSystemImpl(override val name: String,
- applicationConfig: Config,
- classLoader: ClassLoader)
+private[akka] class SparkActorSystemImpl(
+ override val name: String,
+ applicationConfig: Config,
+ classLoader: ClassLoader)
extends ActorSystemImpl(name, applicationConfig, classLoader) {
- protected override def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler =
+ protected override def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = {
+ val fallbackHandler = super.uncaughtExceptionHandler
+
new Thread.UncaughtExceptionHandler() {
def uncaughtException(thread: Thread, cause: Throwable): Unit = {
- cause match {
- case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable
- ⇒ log.error(cause, "Uncaught error from thread [{}]", thread.getName)
- case _ ⇒
- if (settings.JvmExitOnFatalError) {
- try {
- log.error(cause, "Uncaught error from thread [{}] shutting down JVM since " +
- "'akka.jvm-exit-on-fatal-error' is enabled", thread.getName)
- import System.err
- err.print("Uncaught error from thread [")
- err.print(thread.getName)
- err.print("] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for " +
- "ActorSystem[")
- err.print(name)
- err.println("]")
- cause.printStackTrace(System.err)
- System.err.flush()
- } finally {
- System.exit(-1)
- }
- } else {
- log.error(cause, "Uncaught fatal error from thread [{}] not shutting down " +
- "ActorSystem tolerating and continuing.... [{}]", thread.getName, name)
- //shutdown() //TODO make it configurable
- if (thread.isAlive) log.error("Thread is still alive")
- else {
- log.error("Thread is dead")
- }
- }
+ if (isFatalError(cause) && !settings.JvmExitOnFatalError) {
+ log.error(cause, "Uncaught fatal error from thread [{}] not shutting down " +
+ "ActorSystem [{}] tolerating and continuing.... ", thread.getName, name)
+ //shutdown() //TODO make it configurable
+ } else {
+ fallbackHandler.uncaughtException(thread, cause)
}
}
}
-
- override def stop(actor: ActorRef): Unit = {
- val path = actor.path
- val guard = guardian.path
- val sys = systemGuardian.path
- path.parent match {
- case `guard` ⇒ guardian ! StopChild(actor)
- case `sys` ⇒ systemGuardian ! StopChild(actor)
- case _ ⇒ actor.asInstanceOf[InternalActorRef].stop()
- }
}
-
- override def /(actorName: String): ActorPath = guardian.path / actorName
-
- override def /(path: Iterable[String]): ActorPath = guardian.path / path
-
- private lazy val _start: this.type = {
- // the provider is expected to start default loggers, LocalActorRefProvider does this
- provider.init(this)
- this
+ def isFatalError(e: Throwable): Boolean = {
+ e match {
+ case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable =>
+ false
+ case _ =>
+ true
+ }
}
-
- override def start(): this.type = _start
-
- override def toString: String = lookupRoot.path.root.address.toString
-
}