aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlirui <rui.li@intel.com>2014-09-01 23:28:19 -0700
committerReynold Xin <rxin@apache.org>2014-09-01 23:28:19 -0700
commitfbf2678c16acc0071ebd1cbdd165702635be5f0c (patch)
tree3f4359d4c8f3136ac0b5f3cadae1073b7e5737a2
parent44d3a6a75209370b1648e2dfedaa4c895923dae5 (diff)
downloadspark-fbf2678c16acc0071ebd1cbdd165702635be5f0c.tar.gz
spark-fbf2678c16acc0071ebd1cbdd165702635be5f0c.tar.bz2
spark-fbf2678c16acc0071ebd1cbdd165702635be5f0c.zip
SPARK-2636: Expose job ID in JobWaiter API
This PR adds the async actions to the Java API. User can call these async actions to get the FutureAction and use JobWaiter (for SimpleFutureAction) to retrieve job Id. Author: lirui <rui.li@intel.com> Closes #2176 from lirui-intel/SPARK-2636 and squashes the following commits: ccaafb7 [lirui] SPARK-2636: fix java doc 5536d55 [lirui] SPARK-2636: mark the async API as experimental e2e01d5 [lirui] SPARK-2636: add mima exclude 0ca320d [lirui] SPARK-2636: fix method name & javadoc 3fa39f7 [lirui] SPARK-2636: refine the patch af4f5d9 [lirui] SPARK-2636: remove unused imports 843276c [lirui] SPARK-2636: only keep foreachAsync in the java API fbf5744 [lirui] SPARK-2636: add more async actions for java api 1b25abc [lirui] SPARK-2636: expose some fields in JobWaiter d09f732 [lirui] SPARK-2636: fix build eb1ee79 [lirui] SPARK-2636: change some parameters in SimpleFutureAction to member field 6e2b87b [lirui] SPARK-2636: add java API for async actions
-rw-r--r--core/src/main/scala/org/apache/spark/FutureAction.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala2
-rw-r--r--project/MimaExcludes.scala3
5 files changed, 23 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala
index 1e4dec86a0..75ea535f2f 100644
--- a/core/src/main/scala/org/apache/spark/FutureAction.scala
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -149,6 +149,9 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
case JobFailed(e: Exception) => scala.util.Failure(e)
}
}
+
+ /** Get the corresponding job id for this action. */
+ def jobId = jobWaiter.jobId
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index f917cfd141..545bc0e9e9 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -26,7 +26,7 @@ import scala.reflect.ClassTag
import com.google.common.base.Optional
import org.apache.hadoop.io.compress.CompressionCodec
-import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.{FutureAction, Partition, SparkContext, TaskContext}
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaPairRDD._
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
@@ -574,4 +574,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def name(): String = rdd.name
+ /**
+ * :: Experimental ::
+ * The asynchronous version of the foreach action.
+ *
+ * @param f the function to apply to all the elements of the RDD
+ * @return a FutureAction for the action
+ */
+ @Experimental
+ def foreachAsync(f: VoidFunction[T]): FutureAction[Unit] = {
+ import org.apache.spark.SparkContext._
+ rdd.foreachAsync(x => f.call(x))
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index aed951a40b..b62f3fbdc4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -112,7 +112,8 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
* Applies a function f to all elements of this RDD.
*/
def foreachAsync(f: T => Unit): FutureAction[Unit] = {
- self.context.submitJob[T, Unit, Unit](self, _.foreach(f), Range(0, self.partitions.size),
+ val cleanF = self.context.clean(f)
+ self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.size),
(index, data) => Unit, Unit)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
index e9bfee2248..29879b374b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
@@ -23,7 +23,7 @@ package org.apache.spark.scheduler
*/
private[spark] class JobWaiter[T](
dagScheduler: DAGScheduler,
- jobId: Int,
+ val jobId: Int,
totalTasks: Int,
resultHandler: (Int, T) => Unit)
extends JobListener {
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index fe8ffe6d97..a2f1b3582a 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -41,6 +41,9 @@ object MimaExcludes {
Seq(
// Adding new method to JavaRDLike trait - we should probably mark this as a developer API.
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.partitions"),
+ // Should probably mark this as Experimental
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.JavaRDDLike.foreachAsync"),
// We made a mistake earlier (ed06500d3) in the Java API to use default parameter values
// for countApproxDistinct* functions, which does not work in Java. We later removed
// them, and use the following to tell Mima to not care about them.