aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/scheduler/ResultTask.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/scheduler/ResultTask.scala')
-rw-r--r--core/src/main/scala/spark/scheduler/ResultTask.scala6
1 files changed, 4 insertions, 2 deletions
diff --git a/core/src/main/scala/spark/scheduler/ResultTask.scala b/core/src/main/scala/spark/scheduler/ResultTask.scala
index 2ebd4075a2..e492279b4e 100644
--- a/core/src/main/scala/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/spark/scheduler/ResultTask.scala
@@ -10,12 +10,14 @@ private[spark] class ResultTask[T, U](
@transient locs: Seq[String],
val outputId: Int)
extends Task[U](stageId) {
-
+
val split = rdd.splits(partition)
override def run(attemptId: Long): U = {
val context = new TaskContext(stageId, partition, attemptId)
- func(context, rdd.iterator(split))
+ val result = func(context, rdd.iterator(split, context))
+ context.executeOnCompleteCallbacks()
+ result
}
override def preferredLocations: Seq[String] = locs