From 27e7f5a7237d9d64a3b2c8a030ba3e3a9a96b26c Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 4 Jan 2015 21:09:21 -0800 Subject: [SPARK-5083][Core] Fix a flaky test in TaskResultGetterSuite Because `sparkEnv.blockManager.master.removeBlock` is asynchronous, we need to make sure the block has already been removed before calling `super.enqueueSuccessfulTask`. Author: zsxwing Closes #3894 from zsxwing/SPARK-5083 and squashes the following commits: d97c03d [zsxwing] Fix a flaky test in TaskResultGetterSuite --- .../spark/scheduler/TaskResultGetterSuite.scala | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala index 3aab5a156e..e3a3803e64 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala @@ -19,7 +19,12 @@ package org.apache.spark.scheduler import java.nio.ByteBuffer -import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.util.control.NonFatal + +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.concurrent.Eventually._ import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv} import org.apache.spark.storage.TaskResultBlockId @@ -34,6 +39,8 @@ class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedule extends TaskResultGetter(sparkEnv, scheduler) { var removedResult = false + @volatile var removeBlockSuccessfully = false + override def enqueueSuccessfulTask( taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) { if (!removedResult) { @@ -42,6 +49,15 @@ class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedule serializer.get().deserialize[TaskResult[_]](serializedData) match { case IndirectTaskResult(blockId, size) => sparkEnv.blockManager.master.removeBlock(blockId) + // removeBlock is asynchronous. Need to wait it's removed successfully + try { + eventually(timeout(3 seconds), interval(200 milliseconds)) { + assert(!sparkEnv.blockManager.master.contains(blockId)) + } + removeBlockSuccessfully = true + } catch { + case NonFatal(e) => removeBlockSuccessfully = false + } case directResult: DirectTaskResult[_] => taskSetManager.abort("Internal error: expect only indirect results") } @@ -92,10 +108,12 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with LocalSpark assert(false, "Expect local cluster to use TaskSchedulerImpl") throw new ClassCastException } - scheduler.taskResultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler) + val resultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler) + scheduler.taskResultGetter = resultGetter val akkaFrameSize = sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x) + assert(resultGetter.removeBlockSuccessfully) assert(result === 1.to(akkaFrameSize).toArray) // Make sure two tasks were run (one failed one, and a second retried one). -- cgit v1.2.3