diff options
author | root <root@ip-10-70-173-164.ec2.internal> | 2011-05-24 22:27:08 +0000 |
---|---|---|
committer | root <root@ip-10-70-173-164.ec2.internal> | 2011-05-24 22:27:08 +0000 |
commit | 5ef938615febe23fbc01e9524f656e692d45736d (patch) | |
tree | 3abb225d923f37b55110053abede40f755fc8e5d /core | |
parent | cec427e777fe2d6ef0dab285a1f4289d2ae4f89e (diff) | |
download | spark-5ef938615febe23fbc01e9524f656e692d45736d.tar.gz spark-5ef938615febe23fbc01e9524f656e692d45736d.tar.bz2 spark-5ef938615febe23fbc01e9524f656e692d45736d.zip |
Initial work on making stuff compile with protobuf Mesos
Diffstat (limited to 'core')
-rw-r--r-- | core/lib/mesos.jar | bin | 36686 -> 132297 bytes | |||
-rw-r--r-- | core/lib/protobuf-2.3.0.jar | bin | 0 -> 444422 bytes | |||
-rw-r--r-- | core/src/main/scala/spark/Executor.scala | 58 | ||||
-rw-r--r-- | core/src/main/scala/spark/HadoopFile.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/Job.scala | 5 | ||||
-rw-r--r-- | core/src/main/scala/spark/MesosScheduler.scala | 68 | ||||
-rw-r--r-- | core/src/main/scala/spark/ParallelArray.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/RDD.scala | 1 | ||||
-rw-r--r-- | core/src/main/scala/spark/SimpleJob.scala | 52 | ||||
-rw-r--r-- | core/src/main/scala/spark/Task.scala | 2 |
10 files changed, 123 insertions, 67 deletions
diff --git a/core/lib/mesos.jar b/core/lib/mesos.jar Binary files differindex eb01ce8a15..921149edae 100644 --- a/core/lib/mesos.jar +++ b/core/lib/mesos.jar diff --git a/core/lib/protobuf-2.3.0.jar b/core/lib/protobuf-2.3.0.jar Binary files differnew file mode 100644 index 0000000000..b3d4056407 --- /dev/null +++ b/core/lib/protobuf-2.3.0.jar diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala index e534f48879..5ea8914bd0 100644 --- a/core/src/main/scala/spark/Executor.scala +++ b/core/src/main/scala/spark/Executor.scala @@ -6,20 +6,22 @@ import java.util.concurrent._ import scala.collection.mutable.ArrayBuffer -import mesos.{ExecutorArgs, ExecutorDriver, MesosExecutorDriver} -import mesos.{TaskDescription, TaskState, TaskStatus} +import com.google.protobuf.ByteString + +import org.apache.mesos._ +import org.apache.mesos.Protos._ /** * The Mesos executor for Spark. */ -class Executor extends mesos.Executor with Logging { +class Executor extends org.apache.mesos.Executor with Logging { var classLoader: ClassLoader = null var threadPool: ExecutorService = null var env: SparkEnv = null override def init(d: ExecutorDriver, args: ExecutorArgs) { // Read spark.* system properties from executor arg - val props = Utils.deserialize[Array[(String, String)]](args.getData) + val props = Utils.deserialize[Array[(String, String)]](args.getData.toByteArray) for ((key, value) <- props) System.setProperty(key, value) @@ -38,40 +40,46 @@ class Executor extends mesos.Executor with Logging { 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable]) } - override def launchTask(d: ExecutorDriver, desc: TaskDescription) { - // Pull taskId and arg out of TaskDescription because it won't be a - // valid pointer after this method call (TODO: fix this in C++/SWIG) - val taskId = desc.getTaskId - val arg = desc.getArg - threadPool.execute(new TaskRunner(taskId, arg, d)) + override def launchTask(d: ExecutorDriver, task: TaskDescription) { + threadPool.execute(new TaskRunner(task, d)) } - class TaskRunner(taskId: Int, arg: Array[Byte], d: ExecutorDriver) + class TaskRunner(desc: TaskDescription, d: ExecutorDriver) extends Runnable { override def run() = { - logInfo("Running task ID " + taskId) + logInfo("Running task ID " + desc.getTaskId) try { SparkEnv.set(env) Thread.currentThread.setContextClassLoader(classLoader) Accumulators.clear - val task = Utils.deserialize[Task[Any]](arg, classLoader) + val task = Utils.deserialize[Task[Any]](desc.getData.toByteArray, classLoader) for (gen <- task.generation) // Update generation if any is set env.mapOutputTracker.updateGeneration(gen) val value = task.run val accumUpdates = Accumulators.values val result = new TaskResult(value, accumUpdates) - d.sendStatusUpdate(new TaskStatus( - taskId, TaskState.TASK_FINISHED, Utils.serialize(result))) - logInfo("Finished task ID " + taskId) + val status = TaskStatus.newBuilder() + .setTaskId(desc.getTaskId) + .setSlaveId(desc.getSlaveId) + .setState(TaskState.TASK_FINISHED) + .setData(ByteString.copyFrom(Utils.serialize(result))) + .build() + d.sendStatusUpdate(status) + logInfo("Finished task ID " + desc.getTaskId) } catch { case ffe: FetchFailedException => { val reason = ffe.toTaskEndReason - d.sendStatusUpdate(new TaskStatus( - taskId, TaskState.TASK_FAILED, Utils.serialize(reason))) + val status = TaskStatus.newBuilder() + .setTaskId(desc.getTaskId) + .setSlaveId(desc.getSlaveId) + .setState(TaskState.TASK_FAILED) + .setData(ByteString.copyFrom(Utils.serialize(reason))) + .build() + d.sendStatusUpdate(status) } case e: Exception => { // TODO: Handle errors in tasks less dramatically - logError("Exception in task ID " + taskId, e) + logError("Exception in task ID " + desc.getTaskId, e) System.exit(1) } } @@ -117,6 +125,18 @@ class Executor extends mesos.Executor with Logging { val out = new FileOutputStream(localPath) Utils.copyStream(in, out, true) } + + override def error(d: ExecutorDriver, code: Int, message: String) { + logError("Error from Mesos: %s (code %d)".format(message, code)) + } + + override def killTask(d: ExecutorDriver, tid: TaskID) { + logWarning("Mesos asked us to kill task " + tid + "; ignoring (not yet implemented)") + } + + override def shutdown(d: ExecutorDriver) {} + + override def frameworkMessage(d: ExecutorDriver, data: Array[Byte]) {} } /** diff --git a/core/src/main/scala/spark/HadoopFile.scala b/core/src/main/scala/spark/HadoopFile.scala index 0a7996c7bd..31bebd43db 100644 --- a/core/src/main/scala/spark/HadoopFile.scala +++ b/core/src/main/scala/spark/HadoopFile.scala @@ -1,7 +1,5 @@ package spark -import mesos.SlaveOffer - import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.FileInputFormat diff --git a/core/src/main/scala/spark/Job.scala b/core/src/main/scala/spark/Job.scala index 6abbcbce51..9d75c08398 100644 --- a/core/src/main/scala/spark/Job.scala +++ b/core/src/main/scala/spark/Job.scala @@ -1,13 +1,14 @@ package spark -import mesos._ +import org.apache.mesos._ +import org.apache.mesos.Protos._ /** * Class representing a parallel job in MesosScheduler. Schedules the * job by implementing various callbacks. */ abstract class Job(jobId: Int) { - def slaveOffer(s: SlaveOffer, availableCpus: Int, availableMem: Int) + def slaveOffer(s: SlaveOffer, availableCpus: Double, availableMem: Double) : Option[TaskDescription] def statusUpdate(t: TaskStatus): Unit diff --git a/core/src/main/scala/spark/MesosScheduler.scala b/core/src/main/scala/spark/MesosScheduler.scala index d635e95dba..393a33af8c 100644 --- a/core/src/main/scala/spark/MesosScheduler.scala +++ b/core/src/main/scala/spark/MesosScheduler.scala @@ -12,8 +12,11 @@ import scala.collection.mutable.Map import scala.collection.mutable.Queue import scala.collection.JavaConversions._ -import mesos.{Scheduler => MScheduler} -import mesos._ +import com.google.protobuf.ByteString + +import org.apache.mesos.{Scheduler => MScheduler} +import org.apache.mesos._ +import org.apache.mesos.Protos._ /** * The main Scheduler implementation, which runs jobs on Mesos. Clients should @@ -37,8 +40,8 @@ extends MScheduler with DAGScheduler with Logging private var activeJobs = new HashMap[Int, Job] private var activeJobsQueue = new Queue[Job] - private var taskIdToJobId = new HashMap[Int, Int] - private var jobTasks = new HashMap[Int, HashSet[Int]] + private var taskIdToJobId = new HashMap[String, Int] + private var jobTasks = new HashMap[Int, HashSet[String]] // Incrementing job and task IDs private var nextJobId = 0 @@ -59,10 +62,10 @@ extends MScheduler with DAGScheduler with Logging return id } - def newTaskId(): Int = { - val id = nextTaskId; + def newTaskId(): TaskID = { + val id = "" + nextTaskId; nextTaskId += 1; - return id + return TaskID.newBuilder().setValue(id).build() } override def start() { @@ -92,13 +95,21 @@ extends MScheduler with DAGScheduler with Logging "or the SparkContext constructor") } val execScript = new File(sparkHome, "spark-executor").getCanonicalPath - val params = new JHashMap[String, String] + val params = Params.newBuilder() for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) { if (System.getenv(key) != null) { - params("env." + key) = System.getenv(key) + params.addParam(Param.newBuilder() + .setKey("env." + key) + .setValue(System.getenv(key)) + .build()) } } - new ExecutorInfo(execScript, createExecArg(), params) + ExecutorInfo.newBuilder() + .setExecutorId(ExecutorID.newBuilder().setValue("default").build()) + .setUri(execScript) + .setData(ByteString.copyFrom(createExecArg())) + .setParams(params.build()) + .build() } @@ -125,7 +136,7 @@ extends MScheduler with DAGScheduler with Logging } } - override def registered(d: SchedulerDriver, frameworkId: String) { + override def registered(d: SchedulerDriver, frameworkId: FrameworkID) { logInfo("Registered as framework ID " + frameworkId) registeredLock.synchronized { isRegistered = true @@ -146,11 +157,11 @@ extends MScheduler with DAGScheduler with Logging * a round-robin manner so that tasks are balanced across the cluster. */ override def resourceOffer( - d: SchedulerDriver, oid: String, offers: JList[SlaveOffer]) { + d: SchedulerDriver, oid: OfferID, offers: JList[SlaveOffer]) { synchronized { val tasks = new JArrayList[TaskDescription] - val availableCpus = offers.map(_.getParams.get("cpus").toInt) - val availableMem = offers.map(_.getParams.get("mem").toInt) + val availableCpus = offers.map(o => getResource(o.getResourcesList(), "cpus")) + val availableMem = offers.map(o => getResource(o.getResourcesList(), "mem")) var launchedTask = false for (job <- activeJobsQueue) { do { @@ -160,10 +171,10 @@ extends MScheduler with DAGScheduler with Logging job.slaveOffer(offers(i), availableCpus(i), availableMem(i)) match { case Some(task) => tasks.add(task) - taskIdToJobId(task.getTaskId) = job.getId - jobTasks(job.getId) += task.getTaskId - availableCpus(i) -= task.getParams.get("cpus").toInt - availableMem(i) -= task.getParams.get("mem").toInt + taskIdToJobId(task.getTaskId.getValue) = job.getId + jobTasks(job.getId) += task.getTaskId.getValue + availableCpus(i) -= getResource(task.getResourcesList(), "cpus") + availableMem(i) -= getResource(task.getResourcesList(), "mem") launchedTask = true case None => {} } @@ -179,6 +190,13 @@ extends MScheduler with DAGScheduler with Logging } } + // Helper function to pull out a resource from a Mesos Resources protobuf + def getResource(res: JList[Resource], name: String): Double = { + for (r <- res if r.getName == name) + return r.getScalar.getValue + throw new IllegalArgumentException("No resource called " + name + " in " + res) + } + // Check whether a Mesos task state represents a finished task def isFinished(state: TaskState) = { state == TaskState.TASK_FINISHED || @@ -190,18 +208,18 @@ extends MScheduler with DAGScheduler with Logging override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { synchronized { try { - taskIdToJobId.get(status.getTaskId) match { + taskIdToJobId.get(status.getTaskId.getValue) match { case Some(jobId) => if (activeJobs.contains(jobId)) { activeJobs(jobId).statusUpdate(status) } if (isFinished(status.getState)) { - taskIdToJobId.remove(status.getTaskId) + taskIdToJobId.remove(status.getTaskId.getValue) if (jobTasks.contains(jobId)) - jobTasks(jobId) -= status.getTaskId + jobTasks(jobId) -= status.getTaskId.getValue } case None => - logInfo("Ignoring update from TID " + status.getTaskId + + logInfo("Ignoring update from TID " + status.getTaskId.getValue + " because its job is gone") } } catch { @@ -293,4 +311,10 @@ extends MScheduler with DAGScheduler with Logging // Serialize the map as an array of (String, String) pairs return Utils.serialize(props.toArray) } + + override def frameworkMessage(d: SchedulerDriver, s: SlaveID, e: ExecutorID, b: Array[Byte]) {} + + override def slaveLost(d: SchedulerDriver, s: SlaveID) {} + + override def offerRescinded(d: SchedulerDriver, o: OfferID) {} } diff --git a/core/src/main/scala/spark/ParallelArray.scala b/core/src/main/scala/spark/ParallelArray.scala index e77bc3014f..ad230833b8 100644 --- a/core/src/main/scala/spark/ParallelArray.scala +++ b/core/src/main/scala/spark/ParallelArray.scala @@ -1,7 +1,5 @@ package spark -import mesos.SlaveOffer - import java.util.concurrent.atomic.AtomicLong @serializable class ParallelArraySplit[T: ClassManifest]( diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 6334896cb6..56b32edf66 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -13,7 +13,6 @@ import scala.collection.mutable.HashMap import SparkContext._ -import mesos._ @serializable abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { diff --git a/core/src/main/scala/spark/SimpleJob.scala b/core/src/main/scala/spark/SimpleJob.scala index 00ff21369b..2961561f34 100644 --- a/core/src/main/scala/spark/SimpleJob.scala +++ b/core/src/main/scala/spark/SimpleJob.scala @@ -5,7 +5,10 @@ import java.util.{HashMap => JHashMap} import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap -import mesos._ +import com.google.protobuf.ByteString + +import org.apache.mesos._ +import org.apache.mesos.Protos._ /** @@ -19,8 +22,8 @@ extends Job(jobId) with Logging val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong // CPUs and memory to request per task - val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toInt - val MEM_PER_TASK = System.getProperty("spark.task.mem", "512").toInt + val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toDouble + val MEM_PER_TASK = System.getProperty("spark.task.mem", "512").toDouble // Maximum times a task is allowed to fail before failing the job val MAX_TASK_FAILURES = 4 @@ -31,7 +34,7 @@ extends Job(jobId) with Logging val launched = new Array[Boolean](numTasks) val finished = new Array[Boolean](numTasks) val numFailures = new Array[Int](numTasks) - val tidToIndex = HashMap[Int, Int]() + val tidToIndex = HashMap[String, Int]() var tasksLaunched = 0 var tasksFinished = 0 @@ -126,13 +129,13 @@ extends Job(jobId) with Logging } // Respond to an offer of a single slave from the scheduler by finding a task - def slaveOffer(offer: SlaveOffer, availableCpus: Int, availableMem: Int) + def slaveOffer(offer: SlaveOffer, availableCpus: Double, availableMem: Double) : Option[TaskDescription] = { if (tasksLaunched < numTasks && availableCpus >= CPUS_PER_TASK && availableMem >= MEM_PER_TASK) { val time = System.currentTimeMillis val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT) - val host = offer.getHost + val host = offer.getHostname findTask(host, localOnly) match { case Some(index) => { // Found a task; do some bookkeeping and return a Mesos task for it @@ -146,20 +149,35 @@ extends Job(jobId) with Logging jobId, index, taskId, offer.getSlaveId, host, prefStr) logInfo(message) // Do various bookkeeping - tidToIndex(taskId) = index + tidToIndex(taskId.getValue) = index launched(index) = true tasksLaunched += 1 if (preferred) lastPreferredLaunchTime = time // Create and return the Mesos task object - val params = new JHashMap[String, String] - params.put("cpus", CPUS_PER_TASK.toString) - params.put("mem", MEM_PER_TASK.toString) + val cpuRes = Resource.newBuilder() + .setName("cpus") + .setType(Resource.Type.SCALAR) + .setScalar(Resource.Scalar.newBuilder() + .setValue(CPUS_PER_TASK).build()) + .build() + val memRes = Resource.newBuilder() + .setName("mem") + .setType(Resource.Type.SCALAR) + .setScalar(Resource.Scalar.newBuilder() + .setValue(MEM_PER_TASK).build()) + .build() val serializedTask = Utils.serialize(task) logDebug("Serialized size: " + serializedTask.size) val taskName = "task %d:%d".format(jobId, index) - return Some(new TaskDescription( - taskId, offer.getSlaveId, taskName, params, serializedTask)) + return Some(TaskDescription.newBuilder() + .setTaskId(taskId) + .setSlaveId(offer.getSlaveId) + .setName(taskName) + .addResources(cpuRes) + .addResources(memRes) + .setData(ByteString.copyFrom(serializedTask)) + .build()) } case _ => } @@ -183,13 +201,13 @@ extends Job(jobId) with Logging def taskFinished(status: TaskStatus) { val tid = status.getTaskId - val index = tidToIndex(tid) + val index = tidToIndex(tid.getValue) if (!finished(index)) { tasksFinished += 1 logInfo("Finished TID %d (progress: %d/%d)".format( tid, tasksFinished, numTasks)) // Deserialize task result - val result = Utils.deserialize[TaskResult[_]](status.getData) + val result = Utils.deserialize[TaskResult[_]](status.getData.toByteArray) sched.taskEnded(tasks(index), Success, result.value, result.accumUpdates) // Mark finished and stop if we've finished all the tasks finished(index) = true @@ -203,15 +221,15 @@ extends Job(jobId) with Logging def taskLost(status: TaskStatus) { val tid = status.getTaskId - val index = tidToIndex(tid) + val index = tidToIndex(tid.getValue) if (!finished(index)) { logInfo("Lost TID %d (task %d:%d)".format(tid, jobId, index)) launched(index) = false tasksLaunched -= 1 // Check if the problem is a map output fetch failure. In that case, this // task will never succeed on any node, so tell the scheduler about it. - if (status.getData != null && status.getData.length > 0) { - val reason = Utils.deserialize[TaskEndReason](status.getData) + if (status.getData != null && status.getData.size > 0) { + val reason = Utils.deserialize[TaskEndReason](status.getData.toByteArray) reason match { case fetchFailed: FetchFailed => logInfo("Loss was due to fetch failure from " + fetchFailed.serverUri) diff --git a/core/src/main/scala/spark/Task.scala b/core/src/main/scala/spark/Task.scala index 4771bdd0f2..a01eb7ba0b 100644 --- a/core/src/main/scala/spark/Task.scala +++ b/core/src/main/scala/spark/Task.scala @@ -1,7 +1,5 @@ package spark -import mesos._ - @serializable abstract class Task[T] { def run: T |