aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorroot <root@ip-10-70-173-164.ec2.internal>2011-05-24 22:27:08 +0000
committerroot <root@ip-10-70-173-164.ec2.internal>2011-05-24 22:27:08 +0000
commit5ef938615febe23fbc01e9524f656e692d45736d (patch)
tree3abb225d923f37b55110053abede40f755fc8e5d
parentcec427e777fe2d6ef0dab285a1f4289d2ae4f89e (diff)
downloadspark-5ef938615febe23fbc01e9524f656e692d45736d.tar.gz
spark-5ef938615febe23fbc01e9524f656e692d45736d.tar.bz2
spark-5ef938615febe23fbc01e9524f656e692d45736d.zip
Initial work on making stuff compile with protobuf Mesos
-rw-r--r--core/lib/mesos.jarbin36686 -> 132297 bytes
-rw-r--r--core/lib/protobuf-2.3.0.jarbin0 -> 444422 bytes
-rw-r--r--core/src/main/scala/spark/Executor.scala58
-rw-r--r--core/src/main/scala/spark/HadoopFile.scala2
-rw-r--r--core/src/main/scala/spark/Job.scala5
-rw-r--r--core/src/main/scala/spark/MesosScheduler.scala68
-rw-r--r--core/src/main/scala/spark/ParallelArray.scala2
-rw-r--r--core/src/main/scala/spark/RDD.scala1
-rw-r--r--core/src/main/scala/spark/SimpleJob.scala52
-rw-r--r--core/src/main/scala/spark/Task.scala2
-rwxr-xr-xrun1
11 files changed, 124 insertions, 67 deletions
diff --git a/core/lib/mesos.jar b/core/lib/mesos.jar
index eb01ce8a15..921149edae 100644
--- a/core/lib/mesos.jar
+++ b/core/lib/mesos.jar
Binary files differ
diff --git a/core/lib/protobuf-2.3.0.jar b/core/lib/protobuf-2.3.0.jar
new file mode 100644
index 0000000000..b3d4056407
--- /dev/null
+++ b/core/lib/protobuf-2.3.0.jar
Binary files differ
diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala
index e534f48879..5ea8914bd0 100644
--- a/core/src/main/scala/spark/Executor.scala
+++ b/core/src/main/scala/spark/Executor.scala
@@ -6,20 +6,22 @@ import java.util.concurrent._
import scala.collection.mutable.ArrayBuffer
-import mesos.{ExecutorArgs, ExecutorDriver, MesosExecutorDriver}
-import mesos.{TaskDescription, TaskState, TaskStatus}
+import com.google.protobuf.ByteString
+
+import org.apache.mesos._
+import org.apache.mesos.Protos._
/**
* The Mesos executor for Spark.
*/
-class Executor extends mesos.Executor with Logging {
+class Executor extends org.apache.mesos.Executor with Logging {
var classLoader: ClassLoader = null
var threadPool: ExecutorService = null
var env: SparkEnv = null
override def init(d: ExecutorDriver, args: ExecutorArgs) {
// Read spark.* system properties from executor arg
- val props = Utils.deserialize[Array[(String, String)]](args.getData)
+ val props = Utils.deserialize[Array[(String, String)]](args.getData.toByteArray)
for ((key, value) <- props)
System.setProperty(key, value)
@@ -38,40 +40,46 @@ class Executor extends mesos.Executor with Logging {
1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
}
- override def launchTask(d: ExecutorDriver, desc: TaskDescription) {
- // Pull taskId and arg out of TaskDescription because it won't be a
- // valid pointer after this method call (TODO: fix this in C++/SWIG)
- val taskId = desc.getTaskId
- val arg = desc.getArg
- threadPool.execute(new TaskRunner(taskId, arg, d))
+ override def launchTask(d: ExecutorDriver, task: TaskDescription) {
+ threadPool.execute(new TaskRunner(task, d))
}
- class TaskRunner(taskId: Int, arg: Array[Byte], d: ExecutorDriver)
+ class TaskRunner(desc: TaskDescription, d: ExecutorDriver)
extends Runnable {
override def run() = {
- logInfo("Running task ID " + taskId)
+ logInfo("Running task ID " + desc.getTaskId)
try {
SparkEnv.set(env)
Thread.currentThread.setContextClassLoader(classLoader)
Accumulators.clear
- val task = Utils.deserialize[Task[Any]](arg, classLoader)
+ val task = Utils.deserialize[Task[Any]](desc.getData.toByteArray, classLoader)
for (gen <- task.generation) // Update generation if any is set
env.mapOutputTracker.updateGeneration(gen)
val value = task.run
val accumUpdates = Accumulators.values
val result = new TaskResult(value, accumUpdates)
- d.sendStatusUpdate(new TaskStatus(
- taskId, TaskState.TASK_FINISHED, Utils.serialize(result)))
- logInfo("Finished task ID " + taskId)
+ val status = TaskStatus.newBuilder()
+ .setTaskId(desc.getTaskId)
+ .setSlaveId(desc.getSlaveId)
+ .setState(TaskState.TASK_FINISHED)
+ .setData(ByteString.copyFrom(Utils.serialize(result)))
+ .build()
+ d.sendStatusUpdate(status)
+ logInfo("Finished task ID " + desc.getTaskId)
} catch {
case ffe: FetchFailedException => {
val reason = ffe.toTaskEndReason
- d.sendStatusUpdate(new TaskStatus(
- taskId, TaskState.TASK_FAILED, Utils.serialize(reason)))
+ val status = TaskStatus.newBuilder()
+ .setTaskId(desc.getTaskId)
+ .setSlaveId(desc.getSlaveId)
+ .setState(TaskState.TASK_FAILED)
+ .setData(ByteString.copyFrom(Utils.serialize(reason)))
+ .build()
+ d.sendStatusUpdate(status)
}
case e: Exception => {
// TODO: Handle errors in tasks less dramatically
- logError("Exception in task ID " + taskId, e)
+ logError("Exception in task ID " + desc.getTaskId, e)
System.exit(1)
}
}
@@ -117,6 +125,18 @@ class Executor extends mesos.Executor with Logging {
val out = new FileOutputStream(localPath)
Utils.copyStream(in, out, true)
}
+
+ override def error(d: ExecutorDriver, code: Int, message: String) {
+ logError("Error from Mesos: %s (code %d)".format(message, code))
+ }
+
+ override def killTask(d: ExecutorDriver, tid: TaskID) {
+ logWarning("Mesos asked us to kill task " + tid + "; ignoring (not yet implemented)")
+ }
+
+ override def shutdown(d: ExecutorDriver) {}
+
+ override def frameworkMessage(d: ExecutorDriver, data: Array[Byte]) {}
}
/**
diff --git a/core/src/main/scala/spark/HadoopFile.scala b/core/src/main/scala/spark/HadoopFile.scala
index 0a7996c7bd..31bebd43db 100644
--- a/core/src/main/scala/spark/HadoopFile.scala
+++ b/core/src/main/scala/spark/HadoopFile.scala
@@ -1,7 +1,5 @@
package spark
-import mesos.SlaveOffer
-
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.FileInputFormat
diff --git a/core/src/main/scala/spark/Job.scala b/core/src/main/scala/spark/Job.scala
index 6abbcbce51..9d75c08398 100644
--- a/core/src/main/scala/spark/Job.scala
+++ b/core/src/main/scala/spark/Job.scala
@@ -1,13 +1,14 @@
package spark
-import mesos._
+import org.apache.mesos._
+import org.apache.mesos.Protos._
/**
* Class representing a parallel job in MesosScheduler. Schedules the
* job by implementing various callbacks.
*/
abstract class Job(jobId: Int) {
- def slaveOffer(s: SlaveOffer, availableCpus: Int, availableMem: Int)
+ def slaveOffer(s: SlaveOffer, availableCpus: Double, availableMem: Double)
: Option[TaskDescription]
def statusUpdate(t: TaskStatus): Unit
diff --git a/core/src/main/scala/spark/MesosScheduler.scala b/core/src/main/scala/spark/MesosScheduler.scala
index d635e95dba..393a33af8c 100644
--- a/core/src/main/scala/spark/MesosScheduler.scala
+++ b/core/src/main/scala/spark/MesosScheduler.scala
@@ -12,8 +12,11 @@ import scala.collection.mutable.Map
import scala.collection.mutable.Queue
import scala.collection.JavaConversions._
-import mesos.{Scheduler => MScheduler}
-import mesos._
+import com.google.protobuf.ByteString
+
+import org.apache.mesos.{Scheduler => MScheduler}
+import org.apache.mesos._
+import org.apache.mesos.Protos._
/**
* The main Scheduler implementation, which runs jobs on Mesos. Clients should
@@ -37,8 +40,8 @@ extends MScheduler with DAGScheduler with Logging
private var activeJobs = new HashMap[Int, Job]
private var activeJobsQueue = new Queue[Job]
- private var taskIdToJobId = new HashMap[Int, Int]
- private var jobTasks = new HashMap[Int, HashSet[Int]]
+ private var taskIdToJobId = new HashMap[String, Int]
+ private var jobTasks = new HashMap[Int, HashSet[String]]
// Incrementing job and task IDs
private var nextJobId = 0
@@ -59,10 +62,10 @@ extends MScheduler with DAGScheduler with Logging
return id
}
- def newTaskId(): Int = {
- val id = nextTaskId;
+ def newTaskId(): TaskID = {
+ val id = "" + nextTaskId;
nextTaskId += 1;
- return id
+ return TaskID.newBuilder().setValue(id).build()
}
override def start() {
@@ -92,13 +95,21 @@ extends MScheduler with DAGScheduler with Logging
"or the SparkContext constructor")
}
val execScript = new File(sparkHome, "spark-executor").getCanonicalPath
- val params = new JHashMap[String, String]
+ val params = Params.newBuilder()
for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) {
if (System.getenv(key) != null) {
- params("env." + key) = System.getenv(key)
+ params.addParam(Param.newBuilder()
+ .setKey("env." + key)
+ .setValue(System.getenv(key))
+ .build())
}
}
- new ExecutorInfo(execScript, createExecArg(), params)
+ ExecutorInfo.newBuilder()
+ .setExecutorId(ExecutorID.newBuilder().setValue("default").build())
+ .setUri(execScript)
+ .setData(ByteString.copyFrom(createExecArg()))
+ .setParams(params.build())
+ .build()
}
@@ -125,7 +136,7 @@ extends MScheduler with DAGScheduler with Logging
}
}
- override def registered(d: SchedulerDriver, frameworkId: String) {
+ override def registered(d: SchedulerDriver, frameworkId: FrameworkID) {
logInfo("Registered as framework ID " + frameworkId)
registeredLock.synchronized {
isRegistered = true
@@ -146,11 +157,11 @@ extends MScheduler with DAGScheduler with Logging
* a round-robin manner so that tasks are balanced across the cluster.
*/
override def resourceOffer(
- d: SchedulerDriver, oid: String, offers: JList[SlaveOffer]) {
+ d: SchedulerDriver, oid: OfferID, offers: JList[SlaveOffer]) {
synchronized {
val tasks = new JArrayList[TaskDescription]
- val availableCpus = offers.map(_.getParams.get("cpus").toInt)
- val availableMem = offers.map(_.getParams.get("mem").toInt)
+ val availableCpus = offers.map(o => getResource(o.getResourcesList(), "cpus"))
+ val availableMem = offers.map(o => getResource(o.getResourcesList(), "mem"))
var launchedTask = false
for (job <- activeJobsQueue) {
do {
@@ -160,10 +171,10 @@ extends MScheduler with DAGScheduler with Logging
job.slaveOffer(offers(i), availableCpus(i), availableMem(i)) match {
case Some(task) =>
tasks.add(task)
- taskIdToJobId(task.getTaskId) = job.getId
- jobTasks(job.getId) += task.getTaskId
- availableCpus(i) -= task.getParams.get("cpus").toInt
- availableMem(i) -= task.getParams.get("mem").toInt
+ taskIdToJobId(task.getTaskId.getValue) = job.getId
+ jobTasks(job.getId) += task.getTaskId.getValue
+ availableCpus(i) -= getResource(task.getResourcesList(), "cpus")
+ availableMem(i) -= getResource(task.getResourcesList(), "mem")
launchedTask = true
case None => {}
}
@@ -179,6 +190,13 @@ extends MScheduler with DAGScheduler with Logging
}
}
+ // Helper function to pull out a resource from a Mesos Resources protobuf
+ def getResource(res: JList[Resource], name: String): Double = {
+ for (r <- res if r.getName == name)
+ return r.getScalar.getValue
+ throw new IllegalArgumentException("No resource called " + name + " in " + res)
+ }
+
// Check whether a Mesos task state represents a finished task
def isFinished(state: TaskState) = {
state == TaskState.TASK_FINISHED ||
@@ -190,18 +208,18 @@ extends MScheduler with DAGScheduler with Logging
override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
synchronized {
try {
- taskIdToJobId.get(status.getTaskId) match {
+ taskIdToJobId.get(status.getTaskId.getValue) match {
case Some(jobId) =>
if (activeJobs.contains(jobId)) {
activeJobs(jobId).statusUpdate(status)
}
if (isFinished(status.getState)) {
- taskIdToJobId.remove(status.getTaskId)
+ taskIdToJobId.remove(status.getTaskId.getValue)
if (jobTasks.contains(jobId))
- jobTasks(jobId) -= status.getTaskId
+ jobTasks(jobId) -= status.getTaskId.getValue
}
case None =>
- logInfo("Ignoring update from TID " + status.getTaskId +
+ logInfo("Ignoring update from TID " + status.getTaskId.getValue +
" because its job is gone")
}
} catch {
@@ -293,4 +311,10 @@ extends MScheduler with DAGScheduler with Logging
// Serialize the map as an array of (String, String) pairs
return Utils.serialize(props.toArray)
}
+
+ override def frameworkMessage(d: SchedulerDriver, s: SlaveID, e: ExecutorID, b: Array[Byte]) {}
+
+ override def slaveLost(d: SchedulerDriver, s: SlaveID) {}
+
+ override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
}
diff --git a/core/src/main/scala/spark/ParallelArray.scala b/core/src/main/scala/spark/ParallelArray.scala
index e77bc3014f..ad230833b8 100644
--- a/core/src/main/scala/spark/ParallelArray.scala
+++ b/core/src/main/scala/spark/ParallelArray.scala
@@ -1,7 +1,5 @@
package spark
-import mesos.SlaveOffer
-
import java.util.concurrent.atomic.AtomicLong
@serializable class ParallelArraySplit[T: ClassManifest](
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 6334896cb6..56b32edf66 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -13,7 +13,6 @@ import scala.collection.mutable.HashMap
import SparkContext._
-import mesos._
@serializable
abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
diff --git a/core/src/main/scala/spark/SimpleJob.scala b/core/src/main/scala/spark/SimpleJob.scala
index 00ff21369b..2961561f34 100644
--- a/core/src/main/scala/spark/SimpleJob.scala
+++ b/core/src/main/scala/spark/SimpleJob.scala
@@ -5,7 +5,10 @@ import java.util.{HashMap => JHashMap}
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
-import mesos._
+import com.google.protobuf.ByteString
+
+import org.apache.mesos._
+import org.apache.mesos.Protos._
/**
@@ -19,8 +22,8 @@ extends Job(jobId) with Logging
val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong
// CPUs and memory to request per task
- val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toInt
- val MEM_PER_TASK = System.getProperty("spark.task.mem", "512").toInt
+ val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toDouble
+ val MEM_PER_TASK = System.getProperty("spark.task.mem", "512").toDouble
// Maximum times a task is allowed to fail before failing the job
val MAX_TASK_FAILURES = 4
@@ -31,7 +34,7 @@ extends Job(jobId) with Logging
val launched = new Array[Boolean](numTasks)
val finished = new Array[Boolean](numTasks)
val numFailures = new Array[Int](numTasks)
- val tidToIndex = HashMap[Int, Int]()
+ val tidToIndex = HashMap[String, Int]()
var tasksLaunched = 0
var tasksFinished = 0
@@ -126,13 +129,13 @@ extends Job(jobId) with Logging
}
// Respond to an offer of a single slave from the scheduler by finding a task
- def slaveOffer(offer: SlaveOffer, availableCpus: Int, availableMem: Int)
+ def slaveOffer(offer: SlaveOffer, availableCpus: Double, availableMem: Double)
: Option[TaskDescription] = {
if (tasksLaunched < numTasks && availableCpus >= CPUS_PER_TASK &&
availableMem >= MEM_PER_TASK) {
val time = System.currentTimeMillis
val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT)
- val host = offer.getHost
+ val host = offer.getHostname
findTask(host, localOnly) match {
case Some(index) => {
// Found a task; do some bookkeeping and return a Mesos task for it
@@ -146,20 +149,35 @@ extends Job(jobId) with Logging
jobId, index, taskId, offer.getSlaveId, host, prefStr)
logInfo(message)
// Do various bookkeeping
- tidToIndex(taskId) = index
+ tidToIndex(taskId.getValue) = index
launched(index) = true
tasksLaunched += 1
if (preferred)
lastPreferredLaunchTime = time
// Create and return the Mesos task object
- val params = new JHashMap[String, String]
- params.put("cpus", CPUS_PER_TASK.toString)
- params.put("mem", MEM_PER_TASK.toString)
+ val cpuRes = Resource.newBuilder()
+ .setName("cpus")
+ .setType(Resource.Type.SCALAR)
+ .setScalar(Resource.Scalar.newBuilder()
+ .setValue(CPUS_PER_TASK).build())
+ .build()
+ val memRes = Resource.newBuilder()
+ .setName("mem")
+ .setType(Resource.Type.SCALAR)
+ .setScalar(Resource.Scalar.newBuilder()
+ .setValue(MEM_PER_TASK).build())
+ .build()
val serializedTask = Utils.serialize(task)
logDebug("Serialized size: " + serializedTask.size)
val taskName = "task %d:%d".format(jobId, index)
- return Some(new TaskDescription(
- taskId, offer.getSlaveId, taskName, params, serializedTask))
+ return Some(TaskDescription.newBuilder()
+ .setTaskId(taskId)
+ .setSlaveId(offer.getSlaveId)
+ .setName(taskName)
+ .addResources(cpuRes)
+ .addResources(memRes)
+ .setData(ByteString.copyFrom(serializedTask))
+ .build())
}
case _ =>
}
@@ -183,13 +201,13 @@ extends Job(jobId) with Logging
def taskFinished(status: TaskStatus) {
val tid = status.getTaskId
- val index = tidToIndex(tid)
+ val index = tidToIndex(tid.getValue)
if (!finished(index)) {
tasksFinished += 1
logInfo("Finished TID %d (progress: %d/%d)".format(
tid, tasksFinished, numTasks))
// Deserialize task result
- val result = Utils.deserialize[TaskResult[_]](status.getData)
+ val result = Utils.deserialize[TaskResult[_]](status.getData.toByteArray)
sched.taskEnded(tasks(index), Success, result.value, result.accumUpdates)
// Mark finished and stop if we've finished all the tasks
finished(index) = true
@@ -203,15 +221,15 @@ extends Job(jobId) with Logging
def taskLost(status: TaskStatus) {
val tid = status.getTaskId
- val index = tidToIndex(tid)
+ val index = tidToIndex(tid.getValue)
if (!finished(index)) {
logInfo("Lost TID %d (task %d:%d)".format(tid, jobId, index))
launched(index) = false
tasksLaunched -= 1
// Check if the problem is a map output fetch failure. In that case, this
// task will never succeed on any node, so tell the scheduler about it.
- if (status.getData != null && status.getData.length > 0) {
- val reason = Utils.deserialize[TaskEndReason](status.getData)
+ if (status.getData != null && status.getData.size > 0) {
+ val reason = Utils.deserialize[TaskEndReason](status.getData.toByteArray)
reason match {
case fetchFailed: FetchFailed =>
logInfo("Loss was due to fetch failure from " + fetchFailed.serverUri)
diff --git a/core/src/main/scala/spark/Task.scala b/core/src/main/scala/spark/Task.scala
index 4771bdd0f2..a01eb7ba0b 100644
--- a/core/src/main/scala/spark/Task.scala
+++ b/core/src/main/scala/spark/Task.scala
@@ -1,7 +1,5 @@
package spark
-import mesos._
-
@serializable
abstract class Task[T] {
def run: T
diff --git a/run b/run
index d3346f53e7..d4c731e56a 100755
--- a/run
+++ b/run
@@ -57,6 +57,7 @@ CLASSPATH+=:$CORE_DIR/lib/apache-log4j-1.2.16/log4j-1.2.16.jar
CLASSPATH+=:$CORE_DIR/lib/slf4j-1.6.1/slf4j-api-1.6.1.jar
CLASSPATH+=:$CORE_DIR/lib/slf4j-1.6.1/slf4j-log4j12-1.6.1.jar
CLASSPATH+=:$CORE_DIR/lib/compress-lzf-0.6.0/compress-lzf-0.6.0.jar
+CLASSPATH+=:$CORE_DIR/lib/protobuf-2.3.0.jar
CLASSPATH+=:$EXAMPLES_DIR/target/scala_2.8.1/classes
for jar in $CORE_DIR/lib/hadoop-0.20.2/lib/*.jar; do
CLASSPATH+=:$jar