aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org')
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala16
1 files changed, 16 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index 8c4e389e86..0c362b881d 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -189,6 +189,12 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
assert(
!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 3))
}
+
+ test("Duplicate calls to canCommit from the authorized committer gets idempotent responses.") {
+ val rdd = sc.parallelize(Seq(1), 1)
+ sc.runJob(rdd, OutputCommitFunctions(tempDir.getAbsolutePath).callCanCommitMultipleTimes _,
+ 0 until rdd.partitions.size)
+ }
}
/**
@@ -221,6 +227,16 @@ private case class OutputCommitFunctions(tempDirPath: String) {
if (ctx.attemptNumber == 0) failingOutputCommitter else successfulOutputCommitter)
}
+ // Receiver should be idempotent for AskPermissionToCommitOutput
+ def callCanCommitMultipleTimes(iter: Iterator[Int]): Unit = {
+ val ctx = TaskContext.get()
+ val canCommit1 = SparkEnv.get.outputCommitCoordinator
+ .canCommit(ctx.stageId(), ctx.partitionId(), ctx.attemptNumber())
+ val canCommit2 = SparkEnv.get.outputCommitCoordinator
+ .canCommit(ctx.stageId(), ctx.partitionId(), ctx.attemptNumber())
+ assert(canCommit1 && canCommit2)
+ }
+
private def runCommitWithProvidedCommitter(
ctx: TaskContext,
iter: Iterator[Int],