aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala25
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala213
2 files changed, 230 insertions, 8 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index eb116213f6..9d0c127369 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -208,7 +208,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
assert(taskSet.tasks.size >= results.size)
for ((result, i) <- results.zipWithIndex) {
if (i < taskSet.tasks.size) {
- runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, null, null, null))
+ runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, null, createFakeTaskInfo(), null))
}
}
}
@@ -219,7 +219,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
for ((result, i) <- results.zipWithIndex) {
if (i < taskSet.tasks.size) {
runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2,
- Map[Long, Any]((accumId, 1)), null, null))
+ Map[Long, Any]((accumId, 1)), createFakeTaskInfo(), null))
}
}
}
@@ -476,7 +476,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"),
null,
Map[Long, Any](),
- null,
+ createFakeTaskInfo(),
null))
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(sparkListener.failedStages.contains(1))
@@ -487,7 +487,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 1, "ignored"),
null,
Map[Long, Any](),
- null,
+ createFakeTaskInfo(),
null))
// The SparkListener should not receive redundant failure events.
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
@@ -507,14 +507,14 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
assert(newEpoch > oldEpoch)
val taskSet = taskSets(0)
// should be ignored for being too old
- runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, null, null))
+ runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
// should work because it's a non-failed host
- runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), null, null, null))
+ runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), null, createFakeTaskInfo(), null))
// should be ignored for being too old
- runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, null, null))
+ runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
// should work because it's a new epoch
taskSet.tasks(1).epoch = newEpoch
- runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), null, null, null))
+ runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
complete(taskSets(1), Seq((Success, 42), (Success, 43)))
@@ -766,5 +766,14 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
assert(scheduler.shuffleToMapStage.isEmpty)
assert(scheduler.waitingStages.isEmpty)
}
+
+ // Nothing in this test should break if the task info's fields are null, but
+ // OutputCommitCoordinator requires the task info itself to not be null.
+ private def createFakeTaskInfo(): TaskInfo = {
+ val info = new TaskInfo(0, 0, 0, 0L, "", "", TaskLocality.ANY, false)
+ info.finishTime = 1 // to prevent spurious errors in JobProgressListener
+ info
+ }
+
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
new file mode 100644
index 0000000000..3cc860caa1
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io.File
+import java.util.concurrent.TimeoutException
+
+import org.mockito.Matchers
+import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+import org.apache.hadoop.mapred.{TaskAttemptID, JobConf, TaskAttemptContext, OutputCommitter}
+
+import org.apache.spark._
+import org.apache.spark.rdd.{RDD, FakeOutputCommitter}
+import org.apache.spark.util.Utils
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+/**
+ * Unit tests for the output commit coordination functionality.
+ *
+ * The unit test makes both the original task and the speculated task
+ * attempt to commit, where committing is emulated by creating a
+ * directory. If both tasks create directories then the end result is
+ * a failure.
+ *
+ * Note that there are some aspects of this test that are less than ideal.
+ * In particular, the test mocks the speculation-dequeuing logic to always
+ * dequeue a task and consider it as speculated. Immediately after initially
+ * submitting the tasks and calling reviveOffers(), reviveOffers() is invoked
+ * again to pick up the speculated task. This may be hacking the original
+ * behavior in too much of an unrealistic fashion.
+ *
+ * Also, the validation is done by checking the number of files in a directory.
+ * Ideally, an accumulator would be used for this, where we could increment
+ * the accumulator in the output committer's commitTask() call. If the call to
+ * commitTask() was called twice erroneously then the test would ideally fail because
+ * the accumulator would be incremented twice.
+ *
+ * The problem with this test implementation is that when both a speculated task and
+ * its original counterpart complete, only one of the accumulator's increments is
+ * captured. This results in a paradox where if the OutputCommitCoordinator logic
+ * was not in SparkHadoopWriter, the tests would still pass because only one of the
+ * increments would be captured even though the commit in both tasks was executed
+ * erroneously.
+ */
+class OutputCommitCoordinatorSuite extends FunSuite with BeforeAndAfter {
+
+ var outputCommitCoordinator: OutputCommitCoordinator = null
+ var tempDir: File = null
+ var sc: SparkContext = null
+
+ before {
+ tempDir = Utils.createTempDir()
+ val conf = new SparkConf()
+ .setMaster("local[4]")
+ .setAppName(classOf[OutputCommitCoordinatorSuite].getSimpleName)
+ .set("spark.speculation", "true")
+ sc = new SparkContext(conf) {
+ override private[spark] def createSparkEnv(
+ conf: SparkConf,
+ isLocal: Boolean,
+ listenerBus: LiveListenerBus): SparkEnv = {
+ outputCommitCoordinator = spy(new OutputCommitCoordinator(conf))
+ // Use Mockito.spy() to maintain the default infrastructure everywhere else.
+ // This mocking allows us to control the coordinator responses in test cases.
+ SparkEnv.createDriverEnv(conf, isLocal, listenerBus, Some(outputCommitCoordinator))
+ }
+ }
+ // Use Mockito.spy() to maintain the default infrastructure everywhere else
+ val mockTaskScheduler = spy(sc.taskScheduler.asInstanceOf[TaskSchedulerImpl])
+
+ doAnswer(new Answer[Unit]() {
+ override def answer(invoke: InvocationOnMock): Unit = {
+ // Submit the tasks, then force the task scheduler to dequeue the
+ // speculated task
+ invoke.callRealMethod()
+ mockTaskScheduler.backend.reviveOffers()
+ }
+ }).when(mockTaskScheduler).submitTasks(Matchers.any())
+
+ doAnswer(new Answer[TaskSetManager]() {
+ override def answer(invoke: InvocationOnMock): TaskSetManager = {
+ val taskSet = invoke.getArguments()(0).asInstanceOf[TaskSet]
+ new TaskSetManager(mockTaskScheduler, taskSet, 4) {
+ var hasDequeuedSpeculatedTask = false
+ override def dequeueSpeculativeTask(
+ execId: String,
+ host: String,
+ locality: TaskLocality.Value): Option[(Int, TaskLocality.Value)] = {
+ if (!hasDequeuedSpeculatedTask) {
+ hasDequeuedSpeculatedTask = true
+ Some(0, TaskLocality.PROCESS_LOCAL)
+ } else {
+ None
+ }
+ }
+ }
+ }
+ }).when(mockTaskScheduler).createTaskSetManager(Matchers.any(), Matchers.any())
+
+ sc.taskScheduler = mockTaskScheduler
+ val dagSchedulerWithMockTaskScheduler = new DAGScheduler(sc, mockTaskScheduler)
+ sc.taskScheduler.setDAGScheduler(dagSchedulerWithMockTaskScheduler)
+ sc.dagScheduler = dagSchedulerWithMockTaskScheduler
+ }
+
+ after {
+ sc.stop()
+ tempDir.delete()
+ outputCommitCoordinator = null
+ }
+
+ test("Only one of two duplicate commit tasks should commit") {
+ val rdd = sc.parallelize(Seq(1), 1)
+ sc.runJob(rdd, OutputCommitFunctions(tempDir.getAbsolutePath).commitSuccessfully _,
+ 0 until rdd.partitions.size, allowLocal = false)
+ assert(tempDir.list().size === 1)
+ }
+
+ test("If commit fails, if task is retried it should not be locked, and will succeed.") {
+ val rdd = sc.parallelize(Seq(1), 1)
+ sc.runJob(rdd, OutputCommitFunctions(tempDir.getAbsolutePath).failFirstCommitAttempt _,
+ 0 until rdd.partitions.size, allowLocal = false)
+ assert(tempDir.list().size === 1)
+ }
+
+ test("Job should not complete if all commits are denied") {
+ // Create a mock OutputCommitCoordinator that denies all attempts to commit
+ doReturn(false).when(outputCommitCoordinator).handleAskPermissionToCommit(
+ Matchers.any(), Matchers.any(), Matchers.any())
+ val rdd: RDD[Int] = sc.parallelize(Seq(1), 1)
+ def resultHandler(x: Int, y: Unit): Unit = {}
+ val futureAction: SimpleFutureAction[Unit] = sc.submitJob[Int, Unit, Unit](rdd,
+ OutputCommitFunctions(tempDir.getAbsolutePath).commitSuccessfully,
+ 0 until rdd.partitions.size, resultHandler, 0)
+ // It's an error if the job completes successfully even though no committer was authorized,
+ // so throw an exception if the job was allowed to complete.
+ intercept[TimeoutException] {
+ Await.result(futureAction, 5 seconds)
+ }
+ assert(tempDir.list().size === 0)
+ }
+}
+
+/**
+ * Class with methods that can be passed to runJob to test commits with a mock committer.
+ */
+private case class OutputCommitFunctions(tempDirPath: String) {
+
+ // Mock output committer that simulates a successful commit (after commit is authorized)
+ private def successfulOutputCommitter = new FakeOutputCommitter {
+ override def commitTask(context: TaskAttemptContext): Unit = {
+ Utils.createDirectory(tempDirPath)
+ }
+ }
+
+ // Mock output committer that simulates a failed commit (after commit is authorized)
+ private def failingOutputCommitter = new FakeOutputCommitter {
+ override def commitTask(taskAttemptContext: TaskAttemptContext) {
+ throw new RuntimeException
+ }
+ }
+
+ def commitSuccessfully(iter: Iterator[Int]): Unit = {
+ val ctx = TaskContext.get()
+ runCommitWithProvidedCommitter(ctx, iter, successfulOutputCommitter)
+ }
+
+ def failFirstCommitAttempt(iter: Iterator[Int]): Unit = {
+ val ctx = TaskContext.get()
+ runCommitWithProvidedCommitter(ctx, iter,
+ if (ctx.attemptNumber == 0) failingOutputCommitter else successfulOutputCommitter)
+ }
+
+ private def runCommitWithProvidedCommitter(
+ ctx: TaskContext,
+ iter: Iterator[Int],
+ outputCommitter: OutputCommitter): Unit = {
+ def jobConf = new JobConf {
+ override def getOutputCommitter(): OutputCommitter = outputCommitter
+ }
+ val sparkHadoopWriter = new SparkHadoopWriter(jobConf) {
+ override def newTaskAttemptContext(
+ conf: JobConf,
+ attemptId: TaskAttemptID): TaskAttemptContext = {
+ mock(classOf[TaskAttemptContext])
+ }
+ }
+ sparkHadoopWriter.setup(ctx.stageId, ctx.partitionId, ctx.attemptNumber)
+ sparkHadoopWriter.commit()
+ }
+}