aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-01-04 21:09:21 -0800
committerReynold Xin <rxin@databricks.com>2015-01-04 21:09:21 -0800
commit27e7f5a7237d9d64a3b2c8a030ba3e3a9a96b26c (patch)
treeb0fb6bf973531ab7d6d0e7cf6a67546a6df9fb64 /core
parent6c726a3fbd9cd3aa5f3a1992b2132b25eabb76a0 (diff)
downloadspark-27e7f5a7237d9d64a3b2c8a030ba3e3a9a96b26c.tar.gz
spark-27e7f5a7237d9d64a3b2c8a030ba3e3a9a96b26c.tar.bz2
spark-27e7f5a7237d9d64a3b2c8a030ba3e3a9a96b26c.zip
[SPARK-5083][Core] Fix a flaky test in TaskResultGetterSuite
Because `sparkEnv.blockManager.master.removeBlock` is asynchronous, we need to make sure the block has already been removed before calling `super.enqueueSuccessfulTask`. Author: zsxwing <zsxwing@gmail.com> Closes #3894 from zsxwing/SPARK-5083 and squashes the following commits: d97c03d [zsxwing] Fix a flaky test in TaskResultGetterSuite
Diffstat (limited to 'core')
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala22
1 files changed, 20 insertions, 2 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index 3aab5a156e..e3a3803e64 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -19,7 +19,12 @@ package org.apache.spark.scheduler
import java.nio.ByteBuffer
-import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.control.NonFatal
+
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually._
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.storage.TaskResultBlockId
@@ -34,6 +39,8 @@ class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedule
extends TaskResultGetter(sparkEnv, scheduler) {
var removedResult = false
+ @volatile var removeBlockSuccessfully = false
+
override def enqueueSuccessfulTask(
taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
if (!removedResult) {
@@ -42,6 +49,15 @@ class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedule
serializer.get().deserialize[TaskResult[_]](serializedData) match {
case IndirectTaskResult(blockId, size) =>
sparkEnv.blockManager.master.removeBlock(blockId)
+ // removeBlock is asynchronous. Need to wait it's removed successfully
+ try {
+ eventually(timeout(3 seconds), interval(200 milliseconds)) {
+ assert(!sparkEnv.blockManager.master.contains(blockId))
+ }
+ removeBlockSuccessfully = true
+ } catch {
+ case NonFatal(e) => removeBlockSuccessfully = false
+ }
case directResult: DirectTaskResult[_] =>
taskSetManager.abort("Internal error: expect only indirect results")
}
@@ -92,10 +108,12 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with LocalSpark
assert(false, "Expect local cluster to use TaskSchedulerImpl")
throw new ClassCastException
}
- scheduler.taskResultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler)
+ val resultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler)
+ scheduler.taskResultGetter = resultGetter
val akkaFrameSize =
sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
+ assert(resultGetter.removeBlockSuccessfully)
assert(result === 1.to(akkaFrameSize).toArray)
// Make sure two tasks were run (one failed one, and a second retried one).