aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/FutureAction.scala19
1 files changed, 17 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala
index 75ea535f2f..e8f761eaa5 100644
--- a/core/src/main/scala/org/apache/spark/FutureAction.scala
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -83,6 +83,15 @@ trait FutureAction[T] extends Future[T] {
*/
@throws(classOf[Exception])
def get(): T = Await.result(this, Duration.Inf)
+
+ /**
+ * Returns the job IDs run by the underlying async operation.
+ *
+ * This returns the current snapshot of the job list. Certain operations may run multiple
+ * jobs, so multiple calls to this method may return different lists.
+ */
+ def jobIds: Seq[Int]
+
}
@@ -150,8 +159,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
}
}
- /** Get the corresponding job id for this action. */
- def jobId = jobWaiter.jobId
+ def jobIds = Seq(jobWaiter.jobId)
}
@@ -171,6 +179,8 @@ class ComplexFutureAction[T] extends FutureAction[T] {
// is cancelled before the action was even run (and thus we have no thread to interrupt).
@volatile private var _cancelled: Boolean = false
+ @volatile private var jobs: Seq[Int] = Nil
+
// A promise used to signal the future.
private val p = promise[T]()
@@ -219,6 +229,8 @@ class ComplexFutureAction[T] extends FutureAction[T] {
}
}
+ this.jobs = jobs ++ job.jobIds
+
// Wait for the job to complete. If the action is cancelled (with an interrupt),
// cancel the job and stop the execution. This is not in a synchronized block because
// Await.ready eventually waits on the monitor in FutureJob.jobWaiter.
@@ -255,4 +267,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
override def isCompleted: Boolean = p.isCompleted
override def value: Option[Try[T]] = p.future.value
+
+ def jobIds = jobs
+
}