aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-10-24 13:32:23 -0700
committerAndrew Or <andrew@databricks.com>2014-10-24 13:32:23 -0700
commitb563987e8dffc2aed1a834d555589a41cfb2a706 (patch)
tree83388d7800a06de587a6fcde0b5d8a131949b3c3 /core/src/main/scala/org
parent098f83c7ccd7dad9f9228596da69fe5f55711a52 (diff)
downloadspark-b563987e8dffc2aed1a834d555589a41cfb2a706.tar.gz
spark-b563987e8dffc2aed1a834d555589a41cfb2a706.tar.bz2
spark-b563987e8dffc2aed1a834d555589a41cfb2a706.zip
[SPARK-4013] Do not create multiple actor systems on each executor
In the existing code, each coarse-grained executor has two concurrently running actor systems. This causes many more error messages to be logged than necessary when the executor is lost or killed because we receive a disassociation event for each of these actor systems. This is blocking #2840. Author: Andrew Or <andrewor14@gmail.com> Closes #2863 from andrewor14/executor-actor-system and squashes the following commits: 44ce2e0 [Andrew Or] Avoid starting two actor systems on each executor
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala49
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala11
4 files changed, 61 insertions, 22 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 55602a9082..4565832334 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -209,16 +209,10 @@ class SparkContext(config: SparkConf) extends Logging {
// An asynchronous listener bus for Spark events
private[spark] val listenerBus = new LiveListenerBus
- // Create the Spark execution environment (cache, map output tracker, etc)
conf.set("spark.executor.id", "driver")
- private[spark] val env = SparkEnv.create(
- conf,
- "<driver>",
- conf.get("spark.driver.host"),
- conf.get("spark.driver.port").toInt,
- isDriver = true,
- isLocal = isLocal,
- listenerBus = listenerBus)
+
+ // Create the Spark execution environment (cache, map output tracker, etc)
+ private[spark] val env = SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
SparkEnv.set(env)
// Used to store a URL for each static file/jar together with the file's local timestamp
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 906a00b0bd..5c076e5f1c 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -144,14 +144,46 @@ object SparkEnv extends Logging {
env
}
- private[spark] def create(
+ /**
+ * Create a SparkEnv for the driver.
+ */
+ private[spark] def createDriverEnv(
+ conf: SparkConf,
+ isLocal: Boolean,
+ listenerBus: LiveListenerBus): SparkEnv = {
+ assert(conf.contains("spark.driver.host"), "spark.driver.host is not set on the driver!")
+ assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
+ val hostname = conf.get("spark.driver.host")
+ val port = conf.get("spark.driver.port").toInt
+ create(conf, "<driver>", hostname, port, true, isLocal, listenerBus)
+ }
+
+ /**
+ * Create a SparkEnv for an executor.
+ * In coarse-grained mode, the executor provides an actor system that is already instantiated.
+ */
+ private[spark] def createExecutorEnv(
+ conf: SparkConf,
+ executorId: String,
+ hostname: String,
+ port: Int,
+ isLocal: Boolean,
+ actorSystem: ActorSystem = null): SparkEnv = {
+ create(conf, executorId, hostname, port, false, isLocal, defaultActorSystem = actorSystem)
+ }
+
+ /**
+ * Helper method to create a SparkEnv for a driver or an executor.
+ */
+ private def create(
conf: SparkConf,
executorId: String,
hostname: String,
port: Int,
isDriver: Boolean,
isLocal: Boolean,
- listenerBus: LiveListenerBus = null): SparkEnv = {
+ listenerBus: LiveListenerBus = null,
+ defaultActorSystem: ActorSystem = null): SparkEnv = {
// Listener bus is only used on the driver
if (isDriver) {
@@ -159,9 +191,16 @@ object SparkEnv extends Logging {
}
val securityManager = new SecurityManager(conf)
- val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
- actorSystemName, hostname, port, conf, securityManager)
+
+ // If an existing actor system is already provided, use it.
+ // This is the case when an executor is launched in coarse-grained mode.
+ val (actorSystem, boundPort) =
+ Option(defaultActorSystem) match {
+ case Some(as) => (as, port)
+ case None =>
+ val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
+ AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager)
+ }
// Figure out which port Akka actually bound to in case the original port is 0 or occupied.
// This is so that we tell the executors the correct port to connect to.
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index c40a3e1667..697154d762 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
import scala.concurrent.Await
-import akka.actor.{Actor, ActorSelection, Props}
+import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import akka.pattern.Patterns
import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent}
@@ -38,7 +38,8 @@ private[spark] class CoarseGrainedExecutorBackend(
executorId: String,
hostPort: String,
cores: Int,
- sparkProperties: Seq[(String, String)])
+ sparkProperties: Seq[(String, String)],
+ actorSystem: ActorSystem)
extends Actor with ActorLogReceive with ExecutorBackend with Logging {
Utils.checkHostPort(hostPort, "Expected hostport")
@@ -57,8 +58,8 @@ private[spark] class CoarseGrainedExecutorBackend(
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
// Make this host instead of hostPort ?
- executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties,
- false)
+ val (hostname, _) = Utils.parseHostPort(hostPort)
+ executor = new Executor(executorId, hostname, sparkProperties, isLocal = false, actorSystem)
case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
@@ -135,7 +136,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
val sparkHostPort = hostname + ":" + boundPort
actorSystem.actorOf(
Props(classOf[CoarseGrainedExecutorBackend],
- driverUrl, executorId, sparkHostPort, cores, props),
+ driverUrl, executorId, sparkHostPort, cores, props, actorSystem),
name = "Executor")
workerUrl.foreach { url =>
actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 0b75b9b21f..70a46c75f4 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -26,6 +26,8 @@ import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.util.control.NonFatal
+import akka.actor.ActorSystem
+
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler._
@@ -35,12 +37,14 @@ import org.apache.spark.util.{AkkaUtils, Utils}
/**
* Spark executor used with Mesos, YARN, and the standalone scheduler.
+ * In coarse-grained mode, an existing actor system is provided.
*/
private[spark] class Executor(
executorId: String,
slaveHostname: String,
properties: Seq[(String, String)],
- isLocal: Boolean = false)
+ isLocal: Boolean = false,
+ actorSystem: ActorSystem = null)
extends Logging
{
// Application dependencies (added through SparkContext) that we've fetched so far on this node.
@@ -77,8 +81,9 @@ private[spark] class Executor(
conf.set("spark.executor.id", "executor." + executorId)
private val env = {
if (!isLocal) {
- val _env = SparkEnv.create(conf, executorId, slaveHostname, 0,
- isDriver = false, isLocal = false)
+ val port = conf.getInt("spark.executor.port", 0)
+ val _env = SparkEnv.createExecutorEnv(
+ conf, executorId, slaveHostname, port, isLocal, actorSystem)
SparkEnv.set(_env)
_env.metricsSystem.registerSource(executorSource)
_env