aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@apache.org>2014-10-19 20:02:31 -0700
committerJosh Rosen <joshrosen@databricks.com>2014-10-19 20:02:31 -0700
commitd1966f3a8bafdcef87d10ef9db5976cf89faee4b (patch)
tree3a0c9c7a65e0f1f462a8aa783ad56198669b0261 /core/src/main/scala/org
parent7e63bb49c526c3f872619ae14e4b5273f4c535e9 (diff)
downloadspark-d1966f3a8bafdcef87d10ef9db5976cf89faee4b.tar.gz
spark-d1966f3a8bafdcef87d10ef9db5976cf89faee4b.tar.bz2
spark-d1966f3a8bafdcef87d10ef9db5976cf89faee4b.zip
[SPARK-3902] [SPARK-3590] Stabilize AsynRDDActions and add Java API
This PR adds a Java API for AsyncRDDActions and promotes the API from `Experimental` to stable. Author: Josh Rosen <joshrosen@apache.org> Author: Josh Rosen <joshrosen@databricks.com> Closes #2760 from JoshRosen/async-rdd-actions-in-java and squashes the following commits: 0d45fbc [Josh Rosen] Whitespace fix. ad3ae53 [Josh Rosen] Merge remote-tracking branch 'origin/master' into async-rdd-actions-in-java c0153a5 [Josh Rosen] Remove unused variable. e8e2867 [Josh Rosen] Updates based on Marcelo's review feedback 7a1417f [Josh Rosen] Removed unnecessary java.util import. 6f8f6ac [Josh Rosen] Fix import ordering. ff28e49 [Josh Rosen] Add MiMa excludes and fix a scalastyle error. 346e46e [Josh Rosen] [SPARK-3902] Stabilize AsyncRDDActions; add Java API.
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/FutureAction.scala86
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala53
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala3
3 files changed, 112 insertions, 30 deletions
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala
index e8f761eaa5..d5c8f9d76c 100644
--- a/core/src/main/scala/org/apache/spark/FutureAction.scala
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -17,20 +17,21 @@
package org.apache.spark
-import scala.concurrent._
-import scala.concurrent.duration.Duration
-import scala.util.Try
+import java.util.Collections
+import java.util.concurrent.TimeUnit
-import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.JavaFutureAction
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{JobFailed, JobSucceeded, JobWaiter}
+import scala.concurrent._
+import scala.concurrent.duration.Duration
+import scala.util.{Failure, Try}
+
/**
- * :: Experimental ::
* A future for the result of an action to support cancellation. This is an extension of the
* Scala Future interface to support cancellation.
*/
-@Experimental
trait FutureAction[T] extends Future[T] {
// Note that we redefine methods of the Future trait here explicitly so we can specify a different
// documentation (with reference to the word "action").
@@ -70,6 +71,11 @@ trait FutureAction[T] extends Future[T] {
override def isCompleted: Boolean
/**
+ * Returns whether the action has been cancelled.
+ */
+ def isCancelled: Boolean
+
+ /**
* The value of this Future.
*
* If the future is not completed the returned value will be None. If the future is completed
@@ -96,15 +102,16 @@ trait FutureAction[T] extends Future[T] {
/**
- * :: Experimental ::
* A [[FutureAction]] holding the result of an action that triggers a single job. Examples include
* count, collect, reduce.
*/
-@Experimental
class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T)
extends FutureAction[T] {
+ @volatile private var _cancelled: Boolean = false
+
override def cancel() {
+ _cancelled = true
jobWaiter.cancel()
}
@@ -143,6 +150,8 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
}
override def isCompleted: Boolean = jobWaiter.jobFinished
+
+ override def isCancelled: Boolean = _cancelled
override def value: Option[Try[T]] = {
if (jobWaiter.jobFinished) {
@@ -164,12 +173,10 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
/**
- * :: Experimental ::
* A [[FutureAction]] for actions that could trigger multiple Spark jobs. Examples include take,
* takeSample. Cancellation works by setting the cancelled flag to true and interrupting the
* action thread if it is being blocked by a job.
*/
-@Experimental
class ComplexFutureAction[T] extends FutureAction[T] {
// Pointer to the thread that is executing the action. It is set when the action is run.
@@ -222,7 +229,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
// If the action hasn't been cancelled yet, submit the job. The check and the submitJob
// command need to be in an atomic block.
val job = this.synchronized {
- if (!cancelled) {
+ if (!isCancelled) {
rdd.context.submitJob(rdd, processPartition, partitions, resultHandler, resultFunc)
} else {
throw new SparkException("Action has been cancelled")
@@ -243,10 +250,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
}
}
- /**
- * Returns whether the promise has been cancelled.
- */
- def cancelled: Boolean = _cancelled
+ override def isCancelled: Boolean = _cancelled
@throws(classOf[InterruptedException])
@throws(classOf[scala.concurrent.TimeoutException])
@@ -271,3 +275,55 @@ class ComplexFutureAction[T] extends FutureAction[T] {
def jobIds = jobs
}
+
+private[spark]
+class JavaFutureActionWrapper[S, T](futureAction: FutureAction[S], converter: S => T)
+ extends JavaFutureAction[T] {
+
+ import scala.collection.JavaConverters._
+
+ override def isCancelled: Boolean = futureAction.isCancelled
+
+ override def isDone: Boolean = {
+ // According to java.util.Future's Javadoc, this returns True if the task was completed,
+ // whether that completion was due to successful execution, an exception, or a cancellation.
+ futureAction.isCancelled || futureAction.isCompleted
+ }
+
+ override def jobIds(): java.util.List[java.lang.Integer] = {
+ Collections.unmodifiableList(futureAction.jobIds.map(Integer.valueOf).asJava)
+ }
+
+ private def getImpl(timeout: Duration): T = {
+ // This will throw TimeoutException on timeout:
+ Await.ready(futureAction, timeout)
+ futureAction.value.get match {
+ case scala.util.Success(value) => converter(value)
+ case Failure(exception) =>
+ if (isCancelled) {
+ throw new CancellationException("Job cancelled").initCause(exception)
+ } else {
+ // java.util.Future.get() wraps exceptions in ExecutionException
+ throw new ExecutionException("Exception thrown by job", exception)
+ }
+ }
+ }
+
+ override def get(): T = getImpl(Duration.Inf)
+
+ override def get(timeout: Long, unit: TimeUnit): T =
+ getImpl(Duration.fromNanos(unit.toNanos(timeout)))
+
+ override def cancel(mayInterruptIfRunning: Boolean): Boolean = synchronized {
+ if (isDone) {
+ // According to java.util.Future's Javadoc, this should return false if the task is completed.
+ false
+ } else {
+ // We're limited in terms of the semantics we can provide here; our cancellation is
+ // asynchronous and doesn't provide a mechanism to not cancel if the job is running.
+ futureAction.cancel()
+ true
+ }
+ }
+
+}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index c744399483..efb8978f7c 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -21,12 +21,14 @@ import java.util.{Comparator, List => JList, Iterator => JIterator}
import java.lang.{Iterable => JIterable, Long => JLong}
import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import com.google.common.base.Optional
import org.apache.hadoop.io.compress.CompressionCodec
-import org.apache.spark.{FutureAction, Partition, SparkContext, TaskContext}
+import org.apache.spark._
+import org.apache.spark.SparkContext._
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaPairRDD._
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
@@ -294,8 +296,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Applies a function f to all elements of this RDD.
*/
def foreach(f: VoidFunction[T]) {
- val cleanF = rdd.context.clean((x: T) => f.call(x))
- rdd.foreach(cleanF)
+ rdd.foreach(x => f.call(x))
}
/**
@@ -576,16 +577,44 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def name(): String = rdd.name
/**
- * :: Experimental ::
- * The asynchronous version of the foreach action.
- *
- * @param f the function to apply to all the elements of the RDD
- * @return a FutureAction for the action
+ * The asynchronous version of `count`, which returns a
+ * future for counting the number of elements in this RDD.
*/
- @Experimental
- def foreachAsync(f: VoidFunction[T]): FutureAction[Unit] = {
- import org.apache.spark.SparkContext._
- rdd.foreachAsync(x => f.call(x))
+ def countAsync(): JavaFutureAction[JLong] = {
+ new JavaFutureActionWrapper[Long, JLong](rdd.countAsync(), JLong.valueOf)
+ }
+
+ /**
+ * The asynchronous version of `collect`, which returns a future for
+ * retrieving an array containing all of the elements in this RDD.
+ */
+ def collectAsync(): JavaFutureAction[JList[T]] = {
+ new JavaFutureActionWrapper(rdd.collectAsync(), (x: Seq[T]) => x.asJava)
+ }
+
+ /**
+ * The asynchronous version of the `take` action, which returns a
+ * future for retrieving the first `num` elements of this RDD.
+ */
+ def takeAsync(num: Int): JavaFutureAction[JList[T]] = {
+ new JavaFutureActionWrapper(rdd.takeAsync(num), (x: Seq[T]) => x.asJava)
}
+ /**
+ * The asynchronous version of the `foreach` action, which
+ * applies a function f to all the elements of this RDD.
+ */
+ def foreachAsync(f: VoidFunction[T]): JavaFutureAction[Void] = {
+ new JavaFutureActionWrapper[Unit, Void](rdd.foreachAsync(x => f.call(x)),
+ { x => null.asInstanceOf[Void] })
+ }
+
+ /**
+ * The asynchronous version of the `foreachPartition` action, which
+ * applies a function f to each partition of this RDD.
+ */
+ def foreachPartitionAsync(f: VoidFunction[java.util.Iterator[T]]): JavaFutureAction[Void] = {
+ new JavaFutureActionWrapper[Unit, Void](rdd.foreachPartitionAsync(x => f.call(x)),
+ { x => null.asInstanceOf[Void] })
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index ede5568493..9f9f10b7eb 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -24,14 +24,11 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.reflect.ClassTag
import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
-import org.apache.spark.annotation.Experimental
/**
- * :: Experimental ::
* A set of asynchronous RDD actions available through an implicit conversion.
* Import `org.apache.spark.SparkContext._` at the top of your program to use these functions.
*/
-@Experimental
class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Logging {
/**