aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/java/org/apache/spark/api/java/JavaFutureAction.java33
-rw-r--r--core/src/main/scala/org/apache/spark/FutureAction.scala86
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala53
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala3
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java93
-rw-r--r--project/MimaExcludes.scala13
6 files changed, 246 insertions, 35 deletions
diff --git a/core/src/main/java/org/apache/spark/api/java/JavaFutureAction.java b/core/src/main/java/org/apache/spark/api/java/JavaFutureAction.java
new file mode 100644
index 0000000000..0ad189633e
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/JavaFutureAction.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java;
+
+
+import java.util.List;
+import java.util.concurrent.Future;
+
+public interface JavaFutureAction<T> extends Future<T> {
+
+ /**
+ * Returns the job IDs run by the underlying async operation.
+ *
+ * This returns the current snapshot of the job list. Certain operations may run multiple
+ * jobs, so multiple calls to this method may return different lists.
+ */
+ List<Integer> jobIds();
+}
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala
index e8f761eaa5..d5c8f9d76c 100644
--- a/core/src/main/scala/org/apache/spark/FutureAction.scala
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -17,20 +17,21 @@
package org.apache.spark
-import scala.concurrent._
-import scala.concurrent.duration.Duration
-import scala.util.Try
+import java.util.Collections
+import java.util.concurrent.TimeUnit
-import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.JavaFutureAction
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{JobFailed, JobSucceeded, JobWaiter}
+import scala.concurrent._
+import scala.concurrent.duration.Duration
+import scala.util.{Failure, Try}
+
/**
- * :: Experimental ::
* A future for the result of an action to support cancellation. This is an extension of the
* Scala Future interface to support cancellation.
*/
-@Experimental
trait FutureAction[T] extends Future[T] {
// Note that we redefine methods of the Future trait here explicitly so we can specify a different
// documentation (with reference to the word "action").
@@ -70,6 +71,11 @@ trait FutureAction[T] extends Future[T] {
override def isCompleted: Boolean
/**
+ * Returns whether the action has been cancelled.
+ */
+ def isCancelled: Boolean
+
+ /**
* The value of this Future.
*
* If the future is not completed the returned value will be None. If the future is completed
@@ -96,15 +102,16 @@ trait FutureAction[T] extends Future[T] {
/**
- * :: Experimental ::
* A [[FutureAction]] holding the result of an action that triggers a single job. Examples include
* count, collect, reduce.
*/
-@Experimental
class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T)
extends FutureAction[T] {
+ @volatile private var _cancelled: Boolean = false
+
override def cancel() {
+ _cancelled = true
jobWaiter.cancel()
}
@@ -143,6 +150,8 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
}
override def isCompleted: Boolean = jobWaiter.jobFinished
+
+ override def isCancelled: Boolean = _cancelled
override def value: Option[Try[T]] = {
if (jobWaiter.jobFinished) {
@@ -164,12 +173,10 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
/**
- * :: Experimental ::
* A [[FutureAction]] for actions that could trigger multiple Spark jobs. Examples include take,
* takeSample. Cancellation works by setting the cancelled flag to true and interrupting the
* action thread if it is being blocked by a job.
*/
-@Experimental
class ComplexFutureAction[T] extends FutureAction[T] {
// Pointer to the thread that is executing the action. It is set when the action is run.
@@ -222,7 +229,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
// If the action hasn't been cancelled yet, submit the job. The check and the submitJob
// command need to be in an atomic block.
val job = this.synchronized {
- if (!cancelled) {
+ if (!isCancelled) {
rdd.context.submitJob(rdd, processPartition, partitions, resultHandler, resultFunc)
} else {
throw new SparkException("Action has been cancelled")
@@ -243,10 +250,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
}
}
- /**
- * Returns whether the promise has been cancelled.
- */
- def cancelled: Boolean = _cancelled
+ override def isCancelled: Boolean = _cancelled
@throws(classOf[InterruptedException])
@throws(classOf[scala.concurrent.TimeoutException])
@@ -271,3 +275,55 @@ class ComplexFutureAction[T] extends FutureAction[T] {
def jobIds = jobs
}
+
+private[spark]
+class JavaFutureActionWrapper[S, T](futureAction: FutureAction[S], converter: S => T)
+ extends JavaFutureAction[T] {
+
+ import scala.collection.JavaConverters._
+
+ override def isCancelled: Boolean = futureAction.isCancelled
+
+ override def isDone: Boolean = {
+ // According to java.util.Future's Javadoc, this returns True if the task was completed,
+ // whether that completion was due to successful execution, an exception, or a cancellation.
+ futureAction.isCancelled || futureAction.isCompleted
+ }
+
+ override def jobIds(): java.util.List[java.lang.Integer] = {
+ Collections.unmodifiableList(futureAction.jobIds.map(Integer.valueOf).asJava)
+ }
+
+ private def getImpl(timeout: Duration): T = {
+ // This will throw TimeoutException on timeout:
+ Await.ready(futureAction, timeout)
+ futureAction.value.get match {
+ case scala.util.Success(value) => converter(value)
+ case Failure(exception) =>
+ if (isCancelled) {
+ throw new CancellationException("Job cancelled").initCause(exception)
+ } else {
+ // java.util.Future.get() wraps exceptions in ExecutionException
+ throw new ExecutionException("Exception thrown by job", exception)
+ }
+ }
+ }
+
+ override def get(): T = getImpl(Duration.Inf)
+
+ override def get(timeout: Long, unit: TimeUnit): T =
+ getImpl(Duration.fromNanos(unit.toNanos(timeout)))
+
+ override def cancel(mayInterruptIfRunning: Boolean): Boolean = synchronized {
+ if (isDone) {
+ // According to java.util.Future's Javadoc, this should return false if the task is completed.
+ false
+ } else {
+ // We're limited in terms of the semantics we can provide here; our cancellation is
+ // asynchronous and doesn't provide a mechanism to not cancel if the job is running.
+ futureAction.cancel()
+ true
+ }
+ }
+
+}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index c744399483..efb8978f7c 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -21,12 +21,14 @@ import java.util.{Comparator, List => JList, Iterator => JIterator}
import java.lang.{Iterable => JIterable, Long => JLong}
import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import com.google.common.base.Optional
import org.apache.hadoop.io.compress.CompressionCodec
-import org.apache.spark.{FutureAction, Partition, SparkContext, TaskContext}
+import org.apache.spark._
+import org.apache.spark.SparkContext._
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaPairRDD._
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
@@ -294,8 +296,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Applies a function f to all elements of this RDD.
*/
def foreach(f: VoidFunction[T]) {
- val cleanF = rdd.context.clean((x: T) => f.call(x))
- rdd.foreach(cleanF)
+ rdd.foreach(x => f.call(x))
}
/**
@@ -576,16 +577,44 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def name(): String = rdd.name
/**
- * :: Experimental ::
- * The asynchronous version of the foreach action.
- *
- * @param f the function to apply to all the elements of the RDD
- * @return a FutureAction for the action
+ * The asynchronous version of `count`, which returns a
+ * future for counting the number of elements in this RDD.
*/
- @Experimental
- def foreachAsync(f: VoidFunction[T]): FutureAction[Unit] = {
- import org.apache.spark.SparkContext._
- rdd.foreachAsync(x => f.call(x))
+ def countAsync(): JavaFutureAction[JLong] = {
+ new JavaFutureActionWrapper[Long, JLong](rdd.countAsync(), JLong.valueOf)
+ }
+
+ /**
+ * The asynchronous version of `collect`, which returns a future for
+ * retrieving an array containing all of the elements in this RDD.
+ */
+ def collectAsync(): JavaFutureAction[JList[T]] = {
+ new JavaFutureActionWrapper(rdd.collectAsync(), (x: Seq[T]) => x.asJava)
+ }
+
+ /**
+ * The asynchronous version of the `take` action, which returns a
+ * future for retrieving the first `num` elements of this RDD.
+ */
+ def takeAsync(num: Int): JavaFutureAction[JList[T]] = {
+ new JavaFutureActionWrapper(rdd.takeAsync(num), (x: Seq[T]) => x.asJava)
}
+ /**
+ * The asynchronous version of the `foreach` action, which
+ * applies a function f to all the elements of this RDD.
+ */
+ def foreachAsync(f: VoidFunction[T]): JavaFutureAction[Void] = {
+ new JavaFutureActionWrapper[Unit, Void](rdd.foreachAsync(x => f.call(x)),
+ { x => null.asInstanceOf[Void] })
+ }
+
+ /**
+ * The asynchronous version of the `foreachPartition` action, which
+ * applies a function f to each partition of this RDD.
+ */
+ def foreachPartitionAsync(f: VoidFunction[java.util.Iterator[T]]): JavaFutureAction[Void] = {
+ new JavaFutureActionWrapper[Unit, Void](rdd.foreachPartitionAsync(x => f.call(x)),
+ { x => null.asInstanceOf[Void] })
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index ede5568493..9f9f10b7eb 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -24,14 +24,11 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.reflect.ClassTag
import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
-import org.apache.spark.annotation.Experimental
/**
- * :: Experimental ::
* A set of asynchronous RDD actions available through an implicit conversion.
* Import `org.apache.spark.SparkContext._` at the top of your program to use these functions.
*/
-@Experimental
class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Logging {
/**
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index b8fa822ae4..3190148fb5 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -20,6 +20,7 @@ package org.apache.spark;
import java.io.*;
import java.net.URI;
import java.util.*;
+import java.util.concurrent.*;
import scala.Tuple2;
import scala.Tuple3;
@@ -29,6 +30,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.base.Throwables;
import com.google.common.base.Optional;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
@@ -43,10 +45,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.apache.spark.api.java.JavaDoubleRDD;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.executor.TaskMetrics;
import org.apache.spark.partial.BoundedDouble;
@@ -1308,6 +1307,92 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(data.size(), collected.length);
}
+ private static final class BuggyMapFunction<T> implements Function<T, T> {
+
+ @Override
+ public T call(T x) throws Exception {
+ throw new IllegalStateException("Custom exception!");
+ }
+ }
+
+ @Test
+ public void collectAsync() throws Exception {
+ List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
+ JavaRDD<Integer> rdd = sc.parallelize(data, 1);
+ JavaFutureAction<List<Integer>> future = rdd.collectAsync();
+ List<Integer> result = future.get();
+ Assert.assertEquals(data, result);
+ Assert.assertFalse(future.isCancelled());
+ Assert.assertTrue(future.isDone());
+ Assert.assertEquals(1, future.jobIds().size());
+ }
+
+ @Test
+ public void foreachAsync() throws Exception {
+ List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
+ JavaRDD<Integer> rdd = sc.parallelize(data, 1);
+ JavaFutureAction<Void> future = rdd.foreachAsync(
+ new VoidFunction<Integer>() {
+ @Override
+ public void call(Integer integer) throws Exception {
+ // intentionally left blank.
+ }
+ }
+ );
+ future.get();
+ Assert.assertFalse(future.isCancelled());
+ Assert.assertTrue(future.isDone());
+ Assert.assertEquals(1, future.jobIds().size());
+ }
+
+ @Test
+ public void countAsync() throws Exception {
+ List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
+ JavaRDD<Integer> rdd = sc.parallelize(data, 1);
+ JavaFutureAction<Long> future = rdd.countAsync();
+ long count = future.get();
+ Assert.assertEquals(data.size(), count);
+ Assert.assertFalse(future.isCancelled());
+ Assert.assertTrue(future.isDone());
+ Assert.assertEquals(1, future.jobIds().size());
+ }
+
+ @Test
+ public void testAsyncActionCancellation() throws Exception {
+ List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
+ JavaRDD<Integer> rdd = sc.parallelize(data, 1);
+ JavaFutureAction<Void> future = rdd.foreachAsync(new VoidFunction<Integer>() {
+ @Override
+ public void call(Integer integer) throws Exception {
+ Thread.sleep(10000); // To ensure that the job won't finish before it's cancelled.
+ }
+ });
+ future.cancel(true);
+ Assert.assertTrue(future.isCancelled());
+ Assert.assertTrue(future.isDone());
+ try {
+ future.get(2000, TimeUnit.MILLISECONDS);
+ Assert.fail("Expected future.get() for cancelled job to throw CancellationException");
+ } catch (CancellationException ignored) {
+ // pass
+ }
+ }
+
+ @Test
+ public void testAsyncActionErrorWrapping() throws Exception {
+ List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
+ JavaRDD<Integer> rdd = sc.parallelize(data, 1);
+ JavaFutureAction<Long> future = rdd.map(new BuggyMapFunction<Integer>()).countAsync();
+ try {
+ future.get(2, TimeUnit.SECONDS);
+ Assert.fail("Expected future.get() for failed job to throw ExcecutionException");
+ } catch (ExecutionException ee) {
+ Assert.assertTrue(Throwables.getStackTraceAsString(ee).contains("Custom exception!"));
+ }
+ Assert.assertTrue(future.isDone());
+ }
+
+
/**
* Test for SPARK-3647. This test needs to use the maven-built assembly to trigger the issue,
* since that's the only artifact where Guava classes have been relocated.
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 350aad4773..c58666af84 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -54,7 +54,18 @@ object MimaExcludes {
// TaskContext was promoted to Abstract class
ProblemFilters.exclude[AbstractClassProblem](
"org.apache.spark.TaskContext")
-
+ ) ++ Seq(
+ // Adding new methods to the JavaRDDLike trait:
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.JavaRDDLike.takeAsync"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.JavaRDDLike.foreachPartitionAsync"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.JavaRDDLike.countAsync"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.JavaRDDLike.foreachAsync"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.JavaRDDLike.collectAsync")
)
case v if v.startsWith("1.1") =>