diff options
Diffstat (limited to 'core/src/test/scala/org')
-rw-r--r-- | core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala | 16 |
1 files changed, 16 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index 8c4e389e86..0c362b881d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -189,6 +189,12 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { assert( !outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 3)) } + + test("Duplicate calls to canCommit from the authorized committer gets idempotent responses.") { + val rdd = sc.parallelize(Seq(1), 1) + sc.runJob(rdd, OutputCommitFunctions(tempDir.getAbsolutePath).callCanCommitMultipleTimes _, + 0 until rdd.partitions.size) + } } /** @@ -221,6 +227,16 @@ private case class OutputCommitFunctions(tempDirPath: String) { if (ctx.attemptNumber == 0) failingOutputCommitter else successfulOutputCommitter) } + // Receiver should be idempotent for AskPermissionToCommitOutput + def callCanCommitMultipleTimes(iter: Iterator[Int]): Unit = { + val ctx = TaskContext.get() + val canCommit1 = SparkEnv.get.outputCommitCoordinator + .canCommit(ctx.stageId(), ctx.partitionId(), ctx.attemptNumber()) + val canCommit2 = SparkEnv.get.outputCommitCoordinator + .canCommit(ctx.stageId(), ctx.partitionId(), ctx.attemptNumber()) + assert(canCommit1 && canCommit2) + } + private def runCommitWithProvidedCommitter( ctx: TaskContext, iter: Iterator[Int], |