aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2010-06-10 22:09:13 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2010-06-10 22:09:13 -0700
commit359e84c585101b02264455bd84a01bf00c8623d7 (patch)
treef212bb64a0421bea0a11884b0ace881da735d3a3 /src
parent92246c843bb63fb9f1da4978bfffe0b7f9192afd (diff)
downloadspark-359e84c585101b02264455bd84a01bf00c8623d7.tar.gz
spark-359e84c585101b02264455bd84a01bf00c8623d7.tar.bz2
spark-359e84c585101b02264455bd84a01bf00c8623d7.zip
Use new Nexus API
Diffstat (limited to 'src')
-rw-r--r--src/scala/spark/Executor.scala11
1 files changed, 6 insertions, 5 deletions
diff --git a/src/scala/spark/Executor.scala b/src/scala/spark/Executor.scala
index 1f7f6f32fa..c7ee4e594d 100644
--- a/src/scala/spark/Executor.scala
+++ b/src/scala/spark/Executor.scala
@@ -2,7 +2,8 @@ package spark
import java.util.concurrent.{Executors, ExecutorService}
-import nexus.{ExecutorArgs, TaskDescription, TaskState, TaskStatus}
+import nexus.{ExecutorArgs, ExecutorDriver, NexusExecutorDriver}
+import nexus.{TaskDescription, TaskState, TaskStatus}
object Executor {
def main(args: Array[String]) {
@@ -12,7 +13,7 @@ object Executor {
var classLoader: ClassLoader = null
var threadPool: ExecutorService = null
- override def init(args: ExecutorArgs) {
+ override def init(d: ExecutorDriver, args: ExecutorArgs) {
// Read spark.* system properties
val props = Utils.deserialize[Array[(String, String)]](args.getData)
for ((key, value) <- props)
@@ -37,7 +38,7 @@ object Executor {
threadPool = Executors.newCachedThreadPool()
}
- override def startTask(desc: TaskDescription) {
+ override def launchTask(d: ExecutorDriver, desc: TaskDescription) {
// Pull taskId and arg out of TaskDescription because it won't be a
// valid pointer after this method call (TODO: fix this in C++/SWIG)
val taskId = desc.getTaskId
@@ -51,7 +52,7 @@ object Executor {
val value = task.run
val accumUpdates = Accumulators.values
val result = new TaskResult(value, accumUpdates)
- sendStatusUpdate(new TaskStatus(
+ d.sendStatusUpdate(new TaskStatus(
taskId, TaskState.TASK_FINISHED, Utils.serialize(result)))
println("Finished task ID " + taskId)
} catch {
@@ -67,6 +68,6 @@ object Executor {
}
}
- exec.run()
+ new NexusExecutorDriver(exec).run()
}
}