diff options
author | lirui <rui.li@intel.com> | 2014-09-01 23:28:19 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-09-01 23:28:19 -0700 |
commit | fbf2678c16acc0071ebd1cbdd165702635be5f0c (patch) | |
tree | 3f4359d4c8f3136ac0b5f3cadae1073b7e5737a2 | |
parent | 44d3a6a75209370b1648e2dfedaa4c895923dae5 (diff) | |
download | spark-fbf2678c16acc0071ebd1cbdd165702635be5f0c.tar.gz spark-fbf2678c16acc0071ebd1cbdd165702635be5f0c.tar.bz2 spark-fbf2678c16acc0071ebd1cbdd165702635be5f0c.zip |
SPARK-2636: Expose job ID in JobWaiter API
This PR adds the async actions to the Java API. User can call these async actions to get the FutureAction and use JobWaiter (for SimpleFutureAction) to retrieve job Id.
Author: lirui <rui.li@intel.com>
Closes #2176 from lirui-intel/SPARK-2636 and squashes the following commits:
ccaafb7 [lirui] SPARK-2636: fix java doc
5536d55 [lirui] SPARK-2636: mark the async API as experimental
e2e01d5 [lirui] SPARK-2636: add mima exclude
0ca320d [lirui] SPARK-2636: fix method name & javadoc
3fa39f7 [lirui] SPARK-2636: refine the patch
af4f5d9 [lirui] SPARK-2636: remove unused imports
843276c [lirui] SPARK-2636: only keep foreachAsync in the java API
fbf5744 [lirui] SPARK-2636: add more async actions for java api
1b25abc [lirui] SPARK-2636: expose some fields in JobWaiter
d09f732 [lirui] SPARK-2636: fix build
eb1ee79 [lirui] SPARK-2636: change some parameters in SimpleFutureAction to member field
6e2b87b [lirui] SPARK-2636: add java API for async actions
5 files changed, 23 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 1e4dec86a0..75ea535f2f 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -149,6 +149,9 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: case JobFailed(e: Exception) => scala.util.Failure(e) } } + + /** Get the corresponding job id for this action. */ + def jobId = jobWaiter.jobId } diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index f917cfd141..545bc0e9e9 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -26,7 +26,7 @@ import scala.reflect.ClassTag import com.google.common.base.Optional import org.apache.hadoop.io.compress.CompressionCodec -import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.{FutureAction, Partition, SparkContext, TaskContext} import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaPairRDD._ import org.apache.spark.api.java.JavaSparkContext.fakeClassTag @@ -574,4 +574,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def name(): String = rdd.name + /** + * :: Experimental :: + * The asynchronous version of the foreach action. + * + * @param f the function to apply to all the elements of the RDD + * @return a FutureAction for the action + */ + @Experimental + def foreachAsync(f: VoidFunction[T]): FutureAction[Unit] = { + import org.apache.spark.SparkContext._ + rdd.foreachAsync(x => f.call(x)) + } + } diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala index aed951a40b..b62f3fbdc4 100644 --- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala @@ -112,7 +112,8 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi * Applies a function f to all elements of this RDD. */ def foreachAsync(f: T => Unit): FutureAction[Unit] = { - self.context.submitJob[T, Unit, Unit](self, _.foreach(f), Range(0, self.partitions.size), + val cleanF = self.context.clean(f) + self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.size), (index, data) => Unit, Unit) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala index e9bfee2248..29879b374b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala @@ -23,7 +23,7 @@ package org.apache.spark.scheduler */ private[spark] class JobWaiter[T]( dagScheduler: DAGScheduler, - jobId: Int, + val jobId: Int, totalTasks: Int, resultHandler: (Int, T) => Unit) extends JobListener { diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index fe8ffe6d97..a2f1b3582a 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -41,6 +41,9 @@ object MimaExcludes { Seq( // Adding new method to JavaRDLike trait - we should probably mark this as a developer API. ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.partitions"), + // Should probably mark this as Experimental + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.JavaRDDLike.foreachAsync"), // We made a mistake earlier (ed06500d3) in the Java API to use default parameter values // for countApproxDistinct* functions, which does not work in Java. We later removed // them, and use the following to tell Mima to not care about them. |