aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2013-12-06 00:16:40 -0800
committerAaron Davidson <aaron@databricks.com>2013-12-06 00:21:43 -0800
commit5a864e3fce234d19e1b371d9bab40554293546bb (patch)
tree06d07651d699b4b276e2a9e069d2ae7d678f5876 /core
parentf6c8c1c7b686a010ffcec238db14eda34f1645f1 (diff)
parentc9cd2af71eae4126536db790ceaffe0587da7d89 (diff)
downloadspark-5a864e3fce234d19e1b371d9bab40554293546bb.tar.gz
spark-5a864e3fce234d19e1b371d9bab40554293546bb.tar.bz2
spark-5a864e3fce234d19e1b371d9bab40554293546bb.zip
Rename SparkActorSystem to IndestructibleActorSystem
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala (renamed from core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala)15
5 files changed, 23 insertions, 14 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index dcb12bed4e..debbdd4c44 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -97,7 +97,8 @@ private[spark] object CoarseGrainedExecutorBackend {
// Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
// before getting started with all our system properties, etc
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0)
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
+ indestructible = true)
// set it
val sparkHostPort = hostname + ":" + boundPort
System.setProperty("spark.hostPort", sparkHostPort)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index e000531a26..e8fecec4a6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -36,7 +36,7 @@ private[spark] class SimrSchedulerBackend(
override def start() {
super.start()
- val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index b4451fc7b8..df33f6bfb0 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -44,7 +44,7 @@ abstract class BlockObjectWriter(val blockId: BlockId) {
* Flush the partial writes and commit them as a single atomic block. Return the
* number of bytes written for this commit.
*/
- def commit(): Long
+ def commit(): LongSpark
/**
* Reverts writes that haven't been flushed yet. Callers should invoke this function
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 407e9ffe90..9f3f163277 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -17,7 +17,7 @@
package org.apache.spark.util
-import akka.actor.{SparkActorSystem, ActorSystem, ExtendedActorSystem}
+import akka.actor.{IndestructibleActorSystem, ActorSystem, ExtendedActorSystem}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import scala.concurrent.Await
@@ -34,8 +34,13 @@ private[spark] object AkkaUtils {
*
* Note: the `name` parameter is important, as even if a client sends a message to right
* host + port, if the system name is incorrect, Akka will drop the message.
+ *
+ * If indestructible is set to true, the Actor System will continue running in the event
+ * of a fatal exception. This is used by [[org.apache.spark.executor.Executor]].
*/
- def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = {
+ def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false)
+ : (ActorSystem, Int) = {
+
val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt
val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt
@@ -70,7 +75,11 @@ private[spark] object AkkaUtils {
|akka.remote.log-remote-lifecycle-events = $lifecycleEvents
""".stripMargin)
- val actorSystem = SparkActorSystem(name, akkaConf)
+ val actorSystem = if (indestructible) {
+ IndestructibleActorSystem(name, akkaConf)
+ } else {
+ ActorSystem(name, akkaConf)
+ }
val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
val boundPort = provider.getDefaultAddress.port.get
diff --git a/core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala
index a679fd6142..69519860c6 100644
--- a/core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala
+++ b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala
@@ -10,20 +10,19 @@ import scala.util.control.{ControlThrowable, NonFatal}
import com.typesafe.config.Config
/**
- * An ActorSystem specific to Spark. Based off of [[akka.actor.ActorSystem]].
- * The only change from the default system is that we do not shut down the ActorSystem
- * in the event of a fatal exception. This is necessary as Spark is allowed to recover
- * from fatal exceptions (see [[org.apache.spark.executor.Executor]]).
+ * An [[akka.actor.ActorSystem]] which refuses to shut down in the event of a fatal exception.
+ * This is necessary as Spark Executors are allowed to recover from fatal exceptions
+ * (see [[org.apache.spark.executor.Executor]]).
*/
-object SparkActorSystem {
+object IndestructibleActorSystem {
def apply(name: String, config: Config): ActorSystem =
apply(name, config, ActorSystem.findClassLoader())
def apply(name: String, config: Config, classLoader: ClassLoader): ActorSystem =
- new SparkActorSystemImpl(name, config, classLoader).start()
+ new IndestructibleActorSystemImpl(name, config, classLoader).start()
}
-private[akka] class SparkActorSystemImpl(
+private[akka] class IndestructibleActorSystemImpl(
override val name: String,
applicationConfig: Config,
classLoader: ClassLoader)
@@ -36,7 +35,7 @@ private[akka] class SparkActorSystemImpl(
def uncaughtException(thread: Thread, cause: Throwable): Unit = {
if (isFatalError(cause) && !settings.JvmExitOnFatalError) {
log.error(cause, "Uncaught fatal error from thread [{}] not shutting down " +
- "ActorSystem tolerating and continuing.... [{}]", thread.getName, name)
+ "ActorSystem [{}] tolerating and continuing.... ", thread.getName, name)
//shutdown() //TODO make it configurable
} else {
fallbackHandler.uncaughtException(thread, cause)