diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2010-06-10 22:09:13 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2010-06-10 22:09:13 -0700 |
commit | 359e84c585101b02264455bd84a01bf00c8623d7 (patch) | |
tree | f212bb64a0421bea0a11884b0ace881da735d3a3 /src | |
parent | 92246c843bb63fb9f1da4978bfffe0b7f9192afd (diff) | |
download | spark-359e84c585101b02264455bd84a01bf00c8623d7.tar.gz spark-359e84c585101b02264455bd84a01bf00c8623d7.tar.bz2 spark-359e84c585101b02264455bd84a01bf00c8623d7.zip |
Use new Nexus API
Diffstat (limited to 'src')
-rw-r--r-- | src/scala/spark/Executor.scala | 11 |
1 files changed, 6 insertions, 5 deletions
diff --git a/src/scala/spark/Executor.scala b/src/scala/spark/Executor.scala index 1f7f6f32fa..c7ee4e594d 100644 --- a/src/scala/spark/Executor.scala +++ b/src/scala/spark/Executor.scala @@ -2,7 +2,8 @@ package spark import java.util.concurrent.{Executors, ExecutorService} -import nexus.{ExecutorArgs, TaskDescription, TaskState, TaskStatus} +import nexus.{ExecutorArgs, ExecutorDriver, NexusExecutorDriver} +import nexus.{TaskDescription, TaskState, TaskStatus} object Executor { def main(args: Array[String]) { @@ -12,7 +13,7 @@ object Executor { var classLoader: ClassLoader = null var threadPool: ExecutorService = null - override def init(args: ExecutorArgs) { + override def init(d: ExecutorDriver, args: ExecutorArgs) { // Read spark.* system properties val props = Utils.deserialize[Array[(String, String)]](args.getData) for ((key, value) <- props) @@ -37,7 +38,7 @@ object Executor { threadPool = Executors.newCachedThreadPool() } - override def startTask(desc: TaskDescription) { + override def launchTask(d: ExecutorDriver, desc: TaskDescription) { // Pull taskId and arg out of TaskDescription because it won't be a // valid pointer after this method call (TODO: fix this in C++/SWIG) val taskId = desc.getTaskId @@ -51,7 +52,7 @@ object Executor { val value = task.run val accumUpdates = Accumulators.values val result = new TaskResult(value, accumUpdates) - sendStatusUpdate(new TaskStatus( + d.sendStatusUpdate(new TaskStatus( taskId, TaskState.TASK_FINISHED, Utils.serialize(result))) println("Finished task ID " + taskId) } catch { @@ -67,6 +68,6 @@ object Executor { } } - exec.run() + new NexusExecutorDriver(exec).run() } } |