aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/FutureAction.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala2
-rw-r--r--project/MimaExcludes.scala3
5 files changed, 23 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala
index 1e4dec86a0..75ea535f2f 100644
--- a/core/src/main/scala/org/apache/spark/FutureAction.scala
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -149,6 +149,9 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
case JobFailed(e: Exception) => scala.util.Failure(e)
}
}
+
+ /** Get the corresponding job id for this action. */
+ def jobId = jobWaiter.jobId
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index f917cfd141..545bc0e9e9 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -26,7 +26,7 @@ import scala.reflect.ClassTag
import com.google.common.base.Optional
import org.apache.hadoop.io.compress.CompressionCodec
-import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.{FutureAction, Partition, SparkContext, TaskContext}
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaPairRDD._
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
@@ -574,4 +574,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def name(): String = rdd.name
+ /**
+ * :: Experimental ::
+ * The asynchronous version of the foreach action.
+ *
+ * @param f the function to apply to all the elements of the RDD
+ * @return a FutureAction for the action
+ */
+ @Experimental
+ def foreachAsync(f: VoidFunction[T]): FutureAction[Unit] = {
+ import org.apache.spark.SparkContext._
+ rdd.foreachAsync(x => f.call(x))
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index aed951a40b..b62f3fbdc4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -112,7 +112,8 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
* Applies a function f to all elements of this RDD.
*/
def foreachAsync(f: T => Unit): FutureAction[Unit] = {
- self.context.submitJob[T, Unit, Unit](self, _.foreach(f), Range(0, self.partitions.size),
+ val cleanF = self.context.clean(f)
+ self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.size),
(index, data) => Unit, Unit)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
index e9bfee2248..29879b374b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
@@ -23,7 +23,7 @@ package org.apache.spark.scheduler
*/
private[spark] class JobWaiter[T](
dagScheduler: DAGScheduler,
- jobId: Int,
+ val jobId: Int,
totalTasks: Int,
resultHandler: (Int, T) => Unit)
extends JobListener {
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index fe8ffe6d97..a2f1b3582a 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -41,6 +41,9 @@ object MimaExcludes {
Seq(
// Adding new method to JavaRDLike trait - we should probably mark this as a developer API.
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.partitions"),
+ // Should probably mark this as Experimental
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.JavaRDDLike.foreachAsync"),
// We made a mistake earlier (ed06500d3) in the Java API to use default parameter values
// for countApproxDistinct* functions, which does not work in Java. We later removed
// them, and use the following to tell Mima to not care about them.