aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/FutureAction.scala
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2013-10-09 22:43:06 -0700
committerReynold Xin <rxin@apache.org>2013-10-09 22:43:06 -0700
commit53895f9cdedea82d816bdc546f40914195c367ba (patch)
tree6e8b9eef74451c452aad3232f0d3b01d8643f369 /core/src/main/scala/org/apache/spark/FutureAction.scala
parentd29e8035a0e44a0eddf19a153cbae9f9ac2a223a (diff)
downloadspark-53895f9cdedea82d816bdc546f40914195c367ba.tar.gz
spark-53895f9cdedea82d816bdc546f40914195c367ba.tar.bz2
spark-53895f9cdedea82d816bdc546f40914195c367ba.zip
Implemented FutureAction, FutureJob, CancellablePromise.
Implemented more unit tests for async actions.
Diffstat (limited to 'core/src/main/scala/org/apache/spark/FutureAction.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/FutureAction.scala232
1 files changed, 232 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala
new file mode 100644
index 0000000000..465cc1fa7d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import scala.concurrent._
+import scala.concurrent.duration.Duration
+import scala.util.Try
+
+import org.apache.spark.scheduler.{JobSucceeded, JobWaiter}
+import org.apache.spark.scheduler.JobFailed
+
+
+/**
+ * A future for the result of an action. This is an extension of the Scala Future interface to
+ * support cancellation.
+ */
+trait FutureAction[T] extends Future[T] {
+
+ /**
+ * Cancels the execution of this action.
+ */
+ def cancel()
+
+ /**
+ * Blocks until this action completes.
+ * @param atMost maximum wait time, which may be negative (no waiting is done), Duration.Inf
+ * for unbounded waiting, or a finite positive duration
+ * @return this FutureAction
+ */
+ override def ready(atMost: Duration)(implicit permit: CanAwait): FutureAction.this.type
+
+ /**
+ * Await and return the result (of type T) of this action.
+ * @param atMost maximum wait time, which may be negative (no waiting is done), Duration.Inf
+ * for unbounded waiting, or a finite positive duration
+ * @throws Exception exception during action execution
+ * @return the result value if the action is completed within the specific maximum wait time
+ */
+ @throws(classOf[Exception])
+ override def result(atMost: Duration)(implicit permit: CanAwait): T
+
+ /**
+ * When this action is completed, either through an exception, or a value, apply the provided
+ * function.
+ */
+ def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext)
+
+ /**
+ * Returns whether the action has already been completed with a value or an exception.
+ */
+ override def isCompleted: Boolean
+
+ /**
+ * The value of this Future.
+ *
+ * If the future is not completed the returned value will be None. If the future is completed
+ * the value will be Some(Success(t)) if it contains a valid result, or Some(Failure(error)) if
+ * it contains an exception.
+ */
+ override def value: Option[Try[T]]
+
+ /**
+ * Block and return the result of this job.
+ */
+ @throws(classOf[Exception])
+ def get(): T = Await.result(this, Duration.Inf)
+}
+
+
+/**
+ * The future holding the result of an action that triggers a single job. Examples include
+ * count, collect, reduce.
+ */
+class FutureJob[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T)
+ extends FutureAction[T] {
+
+ override def cancel() {
+ jobWaiter.kill()
+ }
+
+ override def ready(atMost: Duration)(implicit permit: CanAwait): FutureJob.this.type = {
+ if (!atMost.isFinite()) {
+ awaitResult()
+ } else {
+ val finishTime = System.currentTimeMillis() + atMost.toMillis
+ while (!isCompleted) {
+ val time = System.currentTimeMillis()
+ if (time >= finishTime) {
+ throw new TimeoutException
+ } else {
+ jobWaiter.wait(finishTime - time)
+ }
+ }
+ }
+ this
+ }
+
+ @throws(classOf[Exception])
+ override def result(atMost: Duration)(implicit permit: CanAwait): T = {
+ ready(atMost)(permit)
+ awaitResult() match {
+ case scala.util.Success(res) => res
+ case scala.util.Failure(e) => throw e
+ }
+ }
+
+ override def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext) {
+ executor.execute(new Runnable {
+ override def run() {
+ func(awaitResult())
+ }
+ })
+ }
+
+ override def isCompleted: Boolean = jobWaiter.jobFinished
+
+ override def value: Option[Try[T]] = {
+ if (jobWaiter.jobFinished) {
+ Some(awaitResult())
+ } else {
+ None
+ }
+ }
+
+ private def awaitResult(): Try[T] = {
+ jobWaiter.awaitResult() match {
+ case JobSucceeded => scala.util.Success(resultFunc)
+ case JobFailed(e: Exception, _) => scala.util.Failure(e)
+ }
+ }
+}
+
+
+/**
+ * A FutureAction for actions that could trigger multiple Spark jobs. Examples include take,
+ * takeSample.
+ *
+ * This is implemented as a Scala Promise that can be cancelled. Note that the promise itself is
+ * also its own Future (i.e. this.future returns this). See the implementation of takeAsync for
+ * usage.
+ */
+class CancellablePromise[T] extends FutureAction[T] with Promise[T] {
+ // Cancellation works by setting the cancelled flag to true and interrupt the action thread
+ // if it is in progress. Before executing the action, the execution thread needs to check the
+ // cancelled flag in case cancel() is called before the thread even starts to execute. Because
+ // this and the execution thread is synchronized on the same promise object (this), the actual
+ // cancellation/interrupt event can only be triggered when the execution thread is waiting for
+ // the result of a job.
+
+ override def cancel(): Unit = this.synchronized {
+ _cancelled = true
+ if (thread != null) {
+ thread.interrupt()
+ }
+ }
+
+ /**
+ * Executes some action enclosed in the closure. This execution of func is wrapped in a
+ * synchronized block to guarantee that this promise can only be cancelled when the task is
+ * waiting for
+ */
+ def run(func: => T)(implicit executor: ExecutionContext): Unit = scala.concurrent.future {
+ thread = Thread.currentThread
+ try {
+ this.success(this.synchronized {
+ if (cancelled) {
+ // This action has been cancelled before this thread even started running.
+ throw new InterruptedException
+ }
+ func
+ })
+ } catch {
+ case e: Exception => this.failure(e)
+ } finally {
+ thread = null
+ }
+ }
+
+ /**
+ * Returns whether the promise has been cancelled.
+ */
+ def cancelled: Boolean = _cancelled
+
+ // Pointer to the thread that is executing the action. It is set when the action is run.
+ @volatile private var thread: Thread = _
+
+ // A flag indicating whether the future has been cancelled. This is used in case the future
+ // is cancelled before the action was even run (and thus we have no thread to interrupt).
+ @volatile private var _cancelled: Boolean = false
+
+ // Internally, we delegate most functionality to this promise.
+ private val p = promise[T]()
+
+ override def future: this.type = this
+
+ override def tryComplete(result: Try[T]): Boolean = p.tryComplete(result)
+
+ @scala.throws(classOf[InterruptedException])
+ @scala.throws(classOf[scala.concurrent.TimeoutException])
+ override def ready(atMost: Duration)(implicit permit: CanAwait): this.type = {
+ p.future.ready(atMost)(permit)
+ this
+ }
+
+ @scala.throws(classOf[Exception])
+ override def result(atMost: Duration)(implicit permit: CanAwait): T = {
+ p.future.result(atMost)(permit)
+ }
+
+ override def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext): Unit = {
+ p.future.onComplete(func)(executor)
+ }
+
+ override def isCompleted: Boolean = p.isCompleted
+
+ override def value: Option[Try[T]] = p.future.value
+}