aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala2
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala3
4 files changed, 17 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index fcd76ec527..49059de50b 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -110,6 +110,11 @@ private[spark] class CoarseGrainedExecutorBackend(
case StopExecutor =>
logInfo("Driver commanded a shutdown")
+ // Cannot shutdown here because an ack may need to be sent back to the caller. So send
+ // a message to self to actually do the shutdown.
+ self.send(Shutdown)
+
+ case Shutdown =>
executor.stop()
stop()
rpcEnv.shutdown()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index d947436777..e0d25dc50c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -100,4 +100,11 @@ private[spark] object CoarseGrainedClusterMessages {
case class KillExecutors(executorIds: Seq[String]) extends CoarseGrainedClusterMessage
+ // Used internally by executors to shut themselves down.
+ case object Shutdown extends CoarseGrainedClusterMessage
+
+ // SPARK-10987: workaround for netty RPC issue; forces a connection from the driver back
+ // to the AM.
+ case object DriverHello extends CoarseGrainedClusterMessage
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index e0107f9d3d..38218b9c08 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -170,6 +170,8 @@ private[spark] abstract class YarnSchedulerBackend(
case RegisterClusterManager(am) =>
logInfo(s"ApplicationMaster registered as $am")
amEndpoint = Option(am)
+ // See SPARK-10987.
+ am.send(DriverHello)
case AddWebUIFilter(filterName, filterParams, proxyBase) =>
addWebUIFilter(filterName, filterParams, proxyBase)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index a2ccdc05d7..3791eea5bf 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -564,6 +564,9 @@ private[spark] class ApplicationMaster(
case x: AddWebUIFilter =>
logInfo(s"Add WebUI Filter. $x")
driver.send(x)
+
+ case DriverHello =>
+ // SPARK-10987: no action needed for this message.
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {