aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-05-15 18:06:01 -0700
committerPatrick Wendell <patrick@databricks.com>2015-05-15 18:06:01 -0700
commit2c04c8a1aed34cce420b3d30d9e885daa6e03d74 (patch)
treefc851d926b416a96ef4313e616afbfa409d4aab1 /core
parente74545647684b3047248ca3cfee894ac5378dead (diff)
downloadspark-2c04c8a1aed34cce420b3d30d9e885daa6e03d74.tar.gz
spark-2c04c8a1aed34cce420b3d30d9e885daa6e03d74.tar.bz2
spark-2c04c8a1aed34cce420b3d30d9e885daa6e03d74.zip
[SPARK-7563] OutputCommitCoordinator.stop() should only run on the driver
This fixes a bug where an executor that exits can cause the driver's OutputCommitCoordinator to stop. To fix this, we use an `isDriver` flag and check it in `stop()`. See https://issues.apache.org/jira/browse/SPARK-7563 for more details. Author: Josh Rosen <joshrosen@databricks.com> Closes #6197 from JoshRosen/SPARK-7563 and squashes the following commits: 04b2cc5 [Josh Rosen] [SPARK-7563] OutputCommitCoordinator.stop() should only be executed on the driver
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala10
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala2
3 files changed, 8 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index a5d831c7e6..3271145428 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -379,7 +379,7 @@ object SparkEnv extends Logging {
}
val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
- new OutputCommitCoordinator(conf)
+ new OutputCommitCoordinator(conf, isDriver)
}
val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index 0b1d47cff3..8321037cdc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -38,7 +38,7 @@ private case class AskPermissionToCommitOutput(stage: Int, task: Long, taskAttem
* This class was introduced in SPARK-4879; see that JIRA issue (and the associated pull requests)
* for an extensive design discussion.
*/
-private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging {
+private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) extends Logging {
// Initialized by SparkEnv
var coordinatorRef: Option[RpcEndpointRef] = None
@@ -129,9 +129,11 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging {
}
def stop(): Unit = synchronized {
- coordinatorRef.foreach(_ send StopCoordinator)
- coordinatorRef = None
- authorizedCommittersByStage.clear()
+ if (isDriver) {
+ coordinatorRef.foreach(_ send StopCoordinator)
+ coordinatorRef = None
+ authorizedCommittersByStage.clear()
+ }
}
// Marked private[scheduler] instead of private so this can be mocked in tests
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index cf97707946..7078a7a122 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -81,7 +81,7 @@ class OutputCommitCoordinatorSuite extends FunSuite with BeforeAndAfter {
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
- outputCommitCoordinator = spy(new OutputCommitCoordinator(conf))
+ outputCommitCoordinator = spy(new OutputCommitCoordinator(conf, isDriver = true))
// Use Mockito.spy() to maintain the default infrastructure everywhere else.
// This mocking allows us to control the coordinator responses in test cases.
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, Some(outputCommitCoordinator))