diff options
4 files changed, 17 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index fcd76ec527..49059de50b 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -110,6 +110,11 @@ private[spark] class CoarseGrainedExecutorBackend( case StopExecutor => logInfo("Driver commanded a shutdown") + // Cannot shutdown here because an ack may need to be sent back to the caller. So send + // a message to self to actually do the shutdown. + self.send(Shutdown) + + case Shutdown => executor.stop() stop() rpcEnv.shutdown() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index d947436777..e0d25dc50c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -100,4 +100,11 @@ private[spark] object CoarseGrainedClusterMessages { case class KillExecutors(executorIds: Seq[String]) extends CoarseGrainedClusterMessage + // Used internally by executors to shut themselves down. + case object Shutdown extends CoarseGrainedClusterMessage + + // SPARK-10987: workaround for netty RPC issue; forces a connection from the driver back + // to the AM. + case object DriverHello extends CoarseGrainedClusterMessage + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index e0107f9d3d..38218b9c08 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -170,6 +170,8 @@ private[spark] abstract class YarnSchedulerBackend( case RegisterClusterManager(am) => logInfo(s"ApplicationMaster registered as $am") amEndpoint = Option(am) + // See SPARK-10987. + am.send(DriverHello) case AddWebUIFilter(filterName, filterParams, proxyBase) => addWebUIFilter(filterName, filterParams, proxyBase) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index a2ccdc05d7..3791eea5bf 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -564,6 +564,9 @@ private[spark] class ApplicationMaster( case x: AddWebUIFilter => logInfo(s"Add WebUI Filter. $x") driver.send(x) + + case DriverHello => + // SPARK-10987: no action needed for this message. } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { |