aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-09-12 21:55:39 -0700
committerReynold Xin <rxin@apache.org>2014-09-12 21:55:39 -0700
commit2584ea5b23b1c5a4df9549b94bfc9b8e0900532e (patch)
tree915dda0c0c1376178113b614094d874c1749e400 /core
parent6d887db7891be643f0131b136e82191b5f6eb407 (diff)
downloadspark-2584ea5b23b1c5a4df9549b94bfc9b8e0900532e.tar.gz
spark-2584ea5b23b1c5a4df9549b94bfc9b8e0900532e.tar.bz2
spark-2584ea5b23b1c5a4df9549b94bfc9b8e0900532e.zip
[SPARK-3469] Make sure all TaskCompletionListener are called even with failures
This is necessary because we rely on this callback interface to clean resources up. The old behavior would lead to resource leaks. Note that this also changes the fault semantics of TaskCompletionListener. Previously failures in TaskCompletionListeners would result in the task being reported immediately. With this change, we report the exception at the end, and the reported exception is a TaskCompletionListenerException that contains all the exception messages. Author: Reynold Xin <rxin@apache.org> Closes #2343 from rxin/taskcontext-callback and squashes the following commits: a3845b2 [Reynold Xin] Mark TaskCompletionListenerException as private[spark]. ac5baea [Reynold Xin] Removed obsolete comment. aa68ea4 [Reynold Xin] Throw an exception if task completion callback fails. 29b6162 [Reynold Xin] oops compilation failed. 1cb444d [Reynold Xin] [SPARK-3469] Call all TaskCompletionListeners even if some fail.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContext.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/util/TaskCompletionListenerException.scala34
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala22
3 files changed, 69 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
index 2b99b8a5af..51b3e4d5e0 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.util.TaskCompletionListener
+import org.apache.spark.util.{TaskCompletionListenerException, TaskCompletionListener}
/**
@@ -41,7 +41,7 @@ class TaskContext(
val attemptId: Long,
val runningLocally: Boolean = false,
private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty)
- extends Serializable {
+ extends Serializable with Logging {
@deprecated("use partitionId", "0.8.1")
def splitId = partitionId
@@ -103,8 +103,20 @@ class TaskContext(
/** Marks the task as completed and triggers the listeners. */
private[spark] def markTaskCompleted(): Unit = {
completed = true
+ val errorMsgs = new ArrayBuffer[String](2)
// Process complete callbacks in the reverse order of registration
- onCompleteCallbacks.reverse.foreach { _.onTaskCompletion(this) }
+ onCompleteCallbacks.reverse.foreach { listener =>
+ try {
+ listener.onTaskCompletion(this)
+ } catch {
+ case e: Throwable =>
+ errorMsgs += e.getMessage
+ logError("Error in TaskCompletionListener", e)
+ }
+ }
+ if (errorMsgs.nonEmpty) {
+ throw new TaskCompletionListenerException(errorMsgs)
+ }
}
/** Marks the task for interruption, i.e. cancellation. */
diff --git a/core/src/main/scala/org/apache/spark/util/TaskCompletionListenerException.scala b/core/src/main/scala/org/apache/spark/util/TaskCompletionListenerException.scala
new file mode 100644
index 0000000000..f64e069cd1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/TaskCompletionListenerException.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+/**
+ * Exception thrown when there is an exception in
+ * executing the callback in TaskCompletionListener.
+ */
+private[spark]
+class TaskCompletionListenerException(errorMessages: Seq[String]) extends Exception {
+
+ override def getMessage: String = {
+ if (errorMessages.size == 1) {
+ errorMessages.head
+ } else {
+ errorMessages.zipWithIndex.map { case (msg, i) => s"Exception $i: $msg" }.mkString("\n")
+ }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index db2ad829a4..faba5508c9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -17,16 +17,20 @@
package org.apache.spark.scheduler
+import org.mockito.Mockito._
+import org.mockito.Matchers.any
+
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import org.apache.spark._
import org.apache.spark.rdd.RDD
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{TaskCompletionListenerException, TaskCompletionListener}
+
class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
- test("Calls executeOnCompleteCallbacks after failure") {
+ test("calls TaskCompletionListener after failure") {
TaskContextSuite.completed = false
sc = new SparkContext("local", "test")
val rdd = new RDD[String](sc, List()) {
@@ -45,6 +49,20 @@ class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkConte
}
assert(TaskContextSuite.completed === true)
}
+
+ test("all TaskCompletionListeners should be called even if some fail") {
+ val context = new TaskContext(0, 0, 0)
+ val listener = mock(classOf[TaskCompletionListener])
+ context.addTaskCompletionListener(_ => throw new Exception("blah"))
+ context.addTaskCompletionListener(listener)
+ context.addTaskCompletionListener(_ => throw new Exception("blah"))
+
+ intercept[TaskCompletionListenerException] {
+ context.markTaskCompleted()
+ }
+
+ verify(listener, times(1)).onTaskCompletion(any())
+ }
}
private object TaskContextSuite {