aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authormcheah <mcheah@palantir.com>2015-02-10 20:12:18 -0800
committerAndrew Or <andrew@databricks.com>2015-02-10 20:12:18 -0800
commit1cb37700753437045b15c457b983532cd5a27fa5 (patch)
tree6a905414febe922ef422e2ac5b50351c94ea6bf1 /core/src/test
parent7e24249af1e2f896328ef0402fa47db78cb6f9ec (diff)
downloadspark-1cb37700753437045b15c457b983532cd5a27fa5.tar.gz
spark-1cb37700753437045b15c457b983532cd5a27fa5.tar.bz2
spark-1cb37700753437045b15c457b983532cd5a27fa5.zip
[SPARK-4879] Use driver to coordinate Hadoop output committing for speculative tasks
Previously, SparkHadoopWriter always committed its tasks without question. The problem is that when speculation is enabled sometimes this can result in multiple tasks committing their output to the same file. Even though an HDFS-writing task may be re-launched due to speculation, the original task is not killed and may eventually commit as well. This can cause strange race conditions where multiple tasks that commit interfere with each other, with the result being that some partition files are actually lost entirely. For more context on these kinds of scenarios, see SPARK-4879. In Hadoop MapReduce jobs, the application master is a central coordinator that authorizes whether or not any given task can commit. Before a task commits its output, it queries the application master as to whether or not such a commit is safe, and the application master does bookkeeping as tasks are requesting commits. Duplicate tasks that would write to files that were already written to from other tasks are prohibited from committing. This patch emulates that functionality - the crucial missing component was a central arbitrator, which is now a module called the OutputCommitCoordinator. The coordinator lives on the driver and the executors can obtain a reference to this actor and request its permission to commit. As tasks commit and are reported as completed successfully or unsuccessfully by the DAGScheduler, the commit coordinator is informed of the task completion events as well to update its internal state. Future work includes more rigorous unit testing and extra optimizations should this patch cause a performance regression. It is unclear what the overall cost of communicating back to the driver on every hadoop-committing task will be. It's also important for those hitting this issue to backport this onto previous version of Spark because the bug has serious consequences, that is, data is lost. Currently, the OutputCommitCoordinator is only used when `spark.speculation` is true. It can be disabled by setting `spark.hadoop.outputCommitCoordination.enabled=false` in SparkConf. This patch is an updated version of #4155 (by mccheah), which in turn was an updated version of this PR. Closes #4155. Author: mcheah <mcheah@palantir.com> Author: Josh Rosen <joshrosen@databricks.com> Closes #4066 from JoshRosen/SPARK-4879-sparkhadoopwriter-fix and squashes the following commits: 658116b [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-4879-sparkhadoopwriter-fix ed783b2 [Josh Rosen] Address Andrew’s feedback. e7be65a [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-4879-sparkhadoopwriter-fix 14861ea [Josh Rosen] splitID -> partitionID in a few places ed8b554 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-4879-sparkhadoopwriter-fix 48d5c1c [Josh Rosen] Roll back copiesRunning change in TaskSetManager 3969f5f [Josh Rosen] Re-enable guarding of commit coordination with spark.speculation setting. ede7590 [Josh Rosen] Add test to ensure that a job that denies all commits cannot complete successfully. 97da5fe [Josh Rosen] Use actor only for RPC; call methods directly in DAGScheduler. f582574 [Josh Rosen] Some cleanup in OutputCommitCoordinatorSuite a7c0e29 [Josh Rosen] Create fake TaskInfo using dummy fields instead of Mockito. 997b41b [Josh Rosen] Roll back unnecessary DAGSchedulerSingleThreadedProcessLoop refactoring: 459310a [Josh Rosen] Roll back TaskSetManager changes that broke other tests. dd00b7c [Josh Rosen] Move CommitDeniedException to executors package; remove `@DeveloperAPI` annotation. c79df98 [Josh Rosen] Some misc. code style + doc changes: f7d69c5 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-4879-sparkhadoopwriter-fix 92e6dc9 [Josh Rosen] Bug fix: use task ID instead of StageID to index into authorizedCommitters. b344bad [Josh Rosen] (Temporarily) re-enable “always coordinate” for testing purposes. 0aec91e [Josh Rosen] Only coordinate when speculation is enabled; add configuration option to bypass new coordination. 594e41a [mcheah] Fixing a scalastyle error 60a47f4 [mcheah] Writing proper unit test for OutputCommitCoordinator and fixing bugs. d63f63f [mcheah] Fixing compiler error 9fe6495 [mcheah] Fixing scalastyle 1df2a91 [mcheah] Throwing exception if SparkHadoopWriter commit denied d431144 [mcheah] Using more concurrency to process OutputCommitCoordinator requests. c334255 [mcheah] Properly handling messages that could be sent after actor shutdown. 8d5a091 [mcheah] Was mistakenly serializing the accumulator in test suite. 9c6a4fa [mcheah] More OutputCommitCoordinator cleanup on stop() 78eb1b5 [mcheah] Better OutputCommitCoordinatorActor stopping; simpler canCommit 83de900 [mcheah] Making the OutputCommitCoordinatorMessage serializable abc7db4 [mcheah] TaskInfo can't be null in DAGSchedulerSuite f135a8e [mcheah] Moving the output commit coordinator from class into method. 1c2b219 [mcheah] Renaming oudated names for test function classes 66a71cd [mcheah] Removing whitespace modifications 6b543ba [mcheah] Removing redundant accumulator in unit test c9decc6 [mcheah] Scalastyle fixes bc80770 [mcheah] Unit tests for OutputCommitCoordinator 6e6f748 [mcheah] [SPARK-4879] Use the Spark driver to authorize Hadoop commits.
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala25
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala213
2 files changed, 230 insertions, 8 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index eb116213f6..9d0c127369 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -208,7 +208,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
assert(taskSet.tasks.size >= results.size)
for ((result, i) <- results.zipWithIndex) {
if (i < taskSet.tasks.size) {
- runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, null, null, null))
+ runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, null, createFakeTaskInfo(), null))
}
}
}
@@ -219,7 +219,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
for ((result, i) <- results.zipWithIndex) {
if (i < taskSet.tasks.size) {
runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2,
- Map[Long, Any]((accumId, 1)), null, null))
+ Map[Long, Any]((accumId, 1)), createFakeTaskInfo(), null))
}
}
}
@@ -476,7 +476,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"),
null,
Map[Long, Any](),
- null,
+ createFakeTaskInfo(),
null))
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(sparkListener.failedStages.contains(1))
@@ -487,7 +487,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 1, "ignored"),
null,
Map[Long, Any](),
- null,
+ createFakeTaskInfo(),
null))
// The SparkListener should not receive redundant failure events.
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
@@ -507,14 +507,14 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
assert(newEpoch > oldEpoch)
val taskSet = taskSets(0)
// should be ignored for being too old
- runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, null, null))
+ runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
// should work because it's a non-failed host
- runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), null, null, null))
+ runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), null, createFakeTaskInfo(), null))
// should be ignored for being too old
- runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, null, null))
+ runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
// should work because it's a new epoch
taskSet.tasks(1).epoch = newEpoch
- runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), null, null, null))
+ runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
complete(taskSets(1), Seq((Success, 42), (Success, 43)))
@@ -766,5 +766,14 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
assert(scheduler.shuffleToMapStage.isEmpty)
assert(scheduler.waitingStages.isEmpty)
}
+
+ // Nothing in this test should break if the task info's fields are null, but
+ // OutputCommitCoordinator requires the task info itself to not be null.
+ private def createFakeTaskInfo(): TaskInfo = {
+ val info = new TaskInfo(0, 0, 0, 0L, "", "", TaskLocality.ANY, false)
+ info.finishTime = 1 // to prevent spurious errors in JobProgressListener
+ info
+ }
+
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
new file mode 100644
index 0000000000..3cc860caa1
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io.File
+import java.util.concurrent.TimeoutException
+
+import org.mockito.Matchers
+import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+import org.apache.hadoop.mapred.{TaskAttemptID, JobConf, TaskAttemptContext, OutputCommitter}
+
+import org.apache.spark._
+import org.apache.spark.rdd.{RDD, FakeOutputCommitter}
+import org.apache.spark.util.Utils
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+/**
+ * Unit tests for the output commit coordination functionality.
+ *
+ * The unit test makes both the original task and the speculated task
+ * attempt to commit, where committing is emulated by creating a
+ * directory. If both tasks create directories then the end result is
+ * a failure.
+ *
+ * Note that there are some aspects of this test that are less than ideal.
+ * In particular, the test mocks the speculation-dequeuing logic to always
+ * dequeue a task and consider it as speculated. Immediately after initially
+ * submitting the tasks and calling reviveOffers(), reviveOffers() is invoked
+ * again to pick up the speculated task. This may be hacking the original
+ * behavior in too much of an unrealistic fashion.
+ *
+ * Also, the validation is done by checking the number of files in a directory.
+ * Ideally, an accumulator would be used for this, where we could increment
+ * the accumulator in the output committer's commitTask() call. If the call to
+ * commitTask() was called twice erroneously then the test would ideally fail because
+ * the accumulator would be incremented twice.
+ *
+ * The problem with this test implementation is that when both a speculated task and
+ * its original counterpart complete, only one of the accumulator's increments is
+ * captured. This results in a paradox where if the OutputCommitCoordinator logic
+ * was not in SparkHadoopWriter, the tests would still pass because only one of the
+ * increments would be captured even though the commit in both tasks was executed
+ * erroneously.
+ */
+class OutputCommitCoordinatorSuite extends FunSuite with BeforeAndAfter {
+
+ var outputCommitCoordinator: OutputCommitCoordinator = null
+ var tempDir: File = null
+ var sc: SparkContext = null
+
+ before {
+ tempDir = Utils.createTempDir()
+ val conf = new SparkConf()
+ .setMaster("local[4]")
+ .setAppName(classOf[OutputCommitCoordinatorSuite].getSimpleName)
+ .set("spark.speculation", "true")
+ sc = new SparkContext(conf) {
+ override private[spark] def createSparkEnv(
+ conf: SparkConf,
+ isLocal: Boolean,
+ listenerBus: LiveListenerBus): SparkEnv = {
+ outputCommitCoordinator = spy(new OutputCommitCoordinator(conf))
+ // Use Mockito.spy() to maintain the default infrastructure everywhere else.
+ // This mocking allows us to control the coordinator responses in test cases.
+ SparkEnv.createDriverEnv(conf, isLocal, listenerBus, Some(outputCommitCoordinator))
+ }
+ }
+ // Use Mockito.spy() to maintain the default infrastructure everywhere else
+ val mockTaskScheduler = spy(sc.taskScheduler.asInstanceOf[TaskSchedulerImpl])
+
+ doAnswer(new Answer[Unit]() {
+ override def answer(invoke: InvocationOnMock): Unit = {
+ // Submit the tasks, then force the task scheduler to dequeue the
+ // speculated task
+ invoke.callRealMethod()
+ mockTaskScheduler.backend.reviveOffers()
+ }
+ }).when(mockTaskScheduler).submitTasks(Matchers.any())
+
+ doAnswer(new Answer[TaskSetManager]() {
+ override def answer(invoke: InvocationOnMock): TaskSetManager = {
+ val taskSet = invoke.getArguments()(0).asInstanceOf[TaskSet]
+ new TaskSetManager(mockTaskScheduler, taskSet, 4) {
+ var hasDequeuedSpeculatedTask = false
+ override def dequeueSpeculativeTask(
+ execId: String,
+ host: String,
+ locality: TaskLocality.Value): Option[(Int, TaskLocality.Value)] = {
+ if (!hasDequeuedSpeculatedTask) {
+ hasDequeuedSpeculatedTask = true
+ Some(0, TaskLocality.PROCESS_LOCAL)
+ } else {
+ None
+ }
+ }
+ }
+ }
+ }).when(mockTaskScheduler).createTaskSetManager(Matchers.any(), Matchers.any())
+
+ sc.taskScheduler = mockTaskScheduler
+ val dagSchedulerWithMockTaskScheduler = new DAGScheduler(sc, mockTaskScheduler)
+ sc.taskScheduler.setDAGScheduler(dagSchedulerWithMockTaskScheduler)
+ sc.dagScheduler = dagSchedulerWithMockTaskScheduler
+ }
+
+ after {
+ sc.stop()
+ tempDir.delete()
+ outputCommitCoordinator = null
+ }
+
+ test("Only one of two duplicate commit tasks should commit") {
+ val rdd = sc.parallelize(Seq(1), 1)
+ sc.runJob(rdd, OutputCommitFunctions(tempDir.getAbsolutePath).commitSuccessfully _,
+ 0 until rdd.partitions.size, allowLocal = false)
+ assert(tempDir.list().size === 1)
+ }
+
+ test("If commit fails, if task is retried it should not be locked, and will succeed.") {
+ val rdd = sc.parallelize(Seq(1), 1)
+ sc.runJob(rdd, OutputCommitFunctions(tempDir.getAbsolutePath).failFirstCommitAttempt _,
+ 0 until rdd.partitions.size, allowLocal = false)
+ assert(tempDir.list().size === 1)
+ }
+
+ test("Job should not complete if all commits are denied") {
+ // Create a mock OutputCommitCoordinator that denies all attempts to commit
+ doReturn(false).when(outputCommitCoordinator).handleAskPermissionToCommit(
+ Matchers.any(), Matchers.any(), Matchers.any())
+ val rdd: RDD[Int] = sc.parallelize(Seq(1), 1)
+ def resultHandler(x: Int, y: Unit): Unit = {}
+ val futureAction: SimpleFutureAction[Unit] = sc.submitJob[Int, Unit, Unit](rdd,
+ OutputCommitFunctions(tempDir.getAbsolutePath).commitSuccessfully,
+ 0 until rdd.partitions.size, resultHandler, 0)
+ // It's an error if the job completes successfully even though no committer was authorized,
+ // so throw an exception if the job was allowed to complete.
+ intercept[TimeoutException] {
+ Await.result(futureAction, 5 seconds)
+ }
+ assert(tempDir.list().size === 0)
+ }
+}
+
+/**
+ * Class with methods that can be passed to runJob to test commits with a mock committer.
+ */
+private case class OutputCommitFunctions(tempDirPath: String) {
+
+ // Mock output committer that simulates a successful commit (after commit is authorized)
+ private def successfulOutputCommitter = new FakeOutputCommitter {
+ override def commitTask(context: TaskAttemptContext): Unit = {
+ Utils.createDirectory(tempDirPath)
+ }
+ }
+
+ // Mock output committer that simulates a failed commit (after commit is authorized)
+ private def failingOutputCommitter = new FakeOutputCommitter {
+ override def commitTask(taskAttemptContext: TaskAttemptContext) {
+ throw new RuntimeException
+ }
+ }
+
+ def commitSuccessfully(iter: Iterator[Int]): Unit = {
+ val ctx = TaskContext.get()
+ runCommitWithProvidedCommitter(ctx, iter, successfulOutputCommitter)
+ }
+
+ def failFirstCommitAttempt(iter: Iterator[Int]): Unit = {
+ val ctx = TaskContext.get()
+ runCommitWithProvidedCommitter(ctx, iter,
+ if (ctx.attemptNumber == 0) failingOutputCommitter else successfulOutputCommitter)
+ }
+
+ private def runCommitWithProvidedCommitter(
+ ctx: TaskContext,
+ iter: Iterator[Int],
+ outputCommitter: OutputCommitter): Unit = {
+ def jobConf = new JobConf {
+ override def getOutputCommitter(): OutputCommitter = outputCommitter
+ }
+ val sparkHadoopWriter = new SparkHadoopWriter(jobConf) {
+ override def newTaskAttemptContext(
+ conf: JobConf,
+ attemptId: TaskAttemptID): TaskAttemptContext = {
+ mock(classOf[TaskAttemptContext])
+ }
+ }
+ sparkHadoopWriter.setup(ctx.stageId, ctx.partitionId, ctx.attemptNumber)
+ sparkHadoopWriter.commit()
+ }
+}