aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala')
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala24
1 files changed, 14 insertions, 10 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index e5ecd4b7c2..6d08d7c5b7 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -63,6 +63,9 @@ import scala.language.postfixOps
* was not in SparkHadoopWriter, the tests would still pass because only one of the
* increments would be captured even though the commit in both tasks was executed
* erroneously.
+ *
+ * See also: [[OutputCommitCoordinatorIntegrationSuite]] for integration tests that do
+ * not use mocks.
*/
class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
@@ -164,27 +167,28 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
test("Only authorized committer failures can clear the authorized committer lock (SPARK-6614)") {
val stage: Int = 1
- val partition: Long = 2
- val authorizedCommitter: Long = 3
- val nonAuthorizedCommitter: Long = 100
+ val partition: Int = 2
+ val authorizedCommitter: Int = 3
+ val nonAuthorizedCommitter: Int = 100
outputCommitCoordinator.stageStart(stage)
- assert(outputCommitCoordinator.canCommit(stage, partition, attempt = authorizedCommitter))
- assert(!outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter))
+
+ assert(outputCommitCoordinator.canCommit(stage, partition, authorizedCommitter))
+ assert(!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter))
// The non-authorized committer fails
outputCommitCoordinator.taskCompleted(
- stage, partition, attempt = nonAuthorizedCommitter, reason = TaskKilled)
+ stage, partition, attemptNumber = nonAuthorizedCommitter, reason = TaskKilled)
// New tasks should still not be able to commit because the authorized committer has not failed
assert(
- !outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 1))
+ !outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 1))
// The authorized committer now fails, clearing the lock
outputCommitCoordinator.taskCompleted(
- stage, partition, attempt = authorizedCommitter, reason = TaskKilled)
+ stage, partition, attemptNumber = authorizedCommitter, reason = TaskKilled)
// A new task should now be allowed to become the authorized committer
assert(
- outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 2))
+ outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 2))
// There can only be one authorized committer
assert(
- !outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 3))
+ !outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 3))
}
}