aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala10
1 files changed, 6 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index f748f394d1..17055e2f22 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -156,14 +156,16 @@ private[spark] object OutputCommitCoordinator {
override val rpcEnv: RpcEnv, outputCommitCoordinator: OutputCommitCoordinator)
extends RpcEndpoint with Logging {
+ override def receive: PartialFunction[Any, Unit] = {
+ case StopCoordinator =>
+ logInfo("OutputCommitCoordinator stopped!")
+ stop()
+ }
+
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case AskPermissionToCommitOutput(stage, partition, taskAttempt) =>
context.reply(
outputCommitCoordinator.handleAskPermissionToCommit(stage, partition, taskAttempt))
- case StopCoordinator =>
- logInfo("OutputCommitCoordinator stopped!")
- context.reply(true)
- stop()
}
}
}