aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2013-12-16 14:16:02 -0800
committerReynold Xin <rxin@apache.org>2013-12-16 14:16:02 -0800
commit883e034aebe61a25631497b4e299a8f2e3389b00 (patch)
treed612829fb3ee15f3ba75700bc9cd730e5e6c01da
parenta51f3404ad8711f5fe66381122c5fa1ead09b3da (diff)
parent558af873340087cad79630ec5c498672c5ea3c4f (diff)
downloadspark-883e034aebe61a25631497b4e299a8f2e3389b00.tar.gz
spark-883e034aebe61a25631497b4e299a8f2e3389b00.tar.bz2
spark-883e034aebe61a25631497b4e299a8f2e3389b00.zip
Merge pull request #245 from gregakespret/task-maxfailures-fix
Fix for spark.task.maxFailures not enforced correctly. Docs at http://spark.incubator.apache.org/docs/latest/configuration.html say: ``` spark.task.maxFailures Number of individual task failures before giving up on the job. Should be greater than or equal to 1. Number of allowed retries = this value - 1. ``` Previous implementation worked incorrectly. When for example `spark.task.maxFailures` was set to 1, the job was aborted only after the second task failure, not after the first one.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/DistributedSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala2
3 files changed, 5 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
index 94961790df..bf494aa64d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -529,10 +529,10 @@ private[spark] class ClusterTaskSetManager(
addPendingTask(index)
if (state != TaskState.KILLED) {
numFailures(index) += 1
- if (numFailures(index) > MAX_TASK_FAILURES) {
- logError("Task %s:%d failed more than %d times; aborting job".format(
+ if (numFailures(index) >= MAX_TASK_FAILURES) {
+ logError("Task %s:%d failed %d times; aborting job".format(
taskSet.id, index, MAX_TASK_FAILURES))
- abort("Task %s:%d failed more than %d times".format(taskSet.id, index, MAX_TASK_FAILURES))
+ abort("Task %s:%d failed %d times".format(taskSet.id, index, MAX_TASK_FAILURES))
}
}
} else {
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index d9cffb74de..d9cb7fead5 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -122,7 +122,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
sc.parallelize(1 to 10, 10).foreach(x => println(x / 0))
}
assert(thrown.getClass === classOf[SparkException])
- assert(thrown.getMessage.contains("more than 4 times"))
+ assert(thrown.getMessage.contains("failed 4 times"))
}
test("caching") {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
index b97f2b19b5..29c4cc5d9c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
@@ -283,7 +283,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
// Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted
// after the last failure.
- (0 until manager.MAX_TASK_FAILURES).foreach { index =>
+ (1 to manager.MAX_TASK_FAILURES).foreach { index =>
val offerResult = manager.resourceOffer("exec1", "host1", 1, ANY)
assert(offerResult != None,
"Expect resource offer on iteration %s to return a task".format(index))