diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-07-06 16:33:40 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-07-06 16:33:40 -0700 |
commit | 4e2fe0bdaf7c2626d8b8461fed36259c9830a25c (patch) | |
tree | 1d0a4253850285a3d7ceabe3b59f4af32862c925 | |
parent | e72afdb817bcc8388aeb8b8d31628fd5fd67acf1 (diff) | |
download | spark-4e2fe0bdaf7c2626d8b8461fed36259c9830a25c.tar.gz spark-4e2fe0bdaf7c2626d8b8461fed36259c9830a25c.tar.bz2 spark-4e2fe0bdaf7c2626d8b8461fed36259c9830a25c.zip |
Miscellaneous bug fixes
5 files changed, 15 insertions, 9 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index d35b2b1cac..2e8cb609b2 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -63,7 +63,7 @@ class SparkContext( System.setProperty("spark.master.port", "0") } - private val isLocal = master.startsWith("local") // TODO: better check for local + private val isLocal = (master == "local" || master.startsWith("local[")) // Create the Spark execution environment (cache, map output tracker, etc) val env = SparkEnv.createFromSystemProperties( diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index ad02b85254..ac30ae9aec 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -69,17 +69,19 @@ class Executor extends Logging { val value = task.run(taskId.toInt) val accumUpdates = Accumulators.values val result = new TaskResult(value, accumUpdates) - context.statusUpdate(taskId, TaskState.FINISHED, ser.serialize(result)) + val serializedResult = ser.serialize(result) + logInfo("Serialized size of result for " + taskId + " is " + serializedResult.limit) + context.statusUpdate(taskId, TaskState.FINISHED, serializedResult) logInfo("Finished task ID " + taskId) } catch { case ffe: FetchFailedException => { val reason = ffe.toTaskEndReason - context.statusUpdate(taskId, TaskState.FINISHED, ser.serialize(reason)) + context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) } case t: Throwable => { val reason = ExceptionFailure(t) - context.statusUpdate(taskId, TaskState.FINISHED, ser.serialize(reason)) + context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) // TODO: Should we exit the whole executor here? On the one hand, the failed task may // have left some weird state around depending on when the exception was thrown, but on diff --git a/core/src/main/scala/spark/executor/MesosExecutorRunner.scala b/core/src/main/scala/spark/executor/MesosExecutorRunner.scala index 7695cbdfd7..f97d9d0bfa 100644 --- a/core/src/main/scala/spark/executor/MesosExecutorRunner.scala +++ b/core/src/main/scala/spark/executor/MesosExecutorRunner.scala @@ -6,6 +6,7 @@ import org.apache.mesos.Protos.{TaskState => MesosTaskState, TaskStatus => Mesos import spark.TaskState.TaskState import com.google.protobuf.ByteString import spark.{Utils, Logging} +import spark.TaskState class MesosExecutorRunner(executor: Executor) extends MesosExecutor @@ -18,7 +19,7 @@ class MesosExecutorRunner(executor: Executor) val mesosTaskId = TaskID.newBuilder().setValue(taskId.toString).build() driver.sendStatusUpdate(MesosTaskStatus.newBuilder() .setTaskId(mesosTaskId) - .setState(MesosTaskState.TASK_FINISHED) + .setState(TaskState.toMesos(state)) .setData(ByteString.copyFrom(data)) .build()) } diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala index 75b67a0eb4..ab07f1c8c2 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala @@ -227,6 +227,7 @@ class TaskSetManager( } def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { + logInfo("statusUpdate: " + tid + " is now " + state + " " + serializedData) state match { case TaskState.FINISHED => taskFinished(tid, state, serializedData) diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala b/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala index 8131d84fdf..f5c35becda 100644 --- a/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala +++ b/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala @@ -188,10 +188,12 @@ class MesosScheduler( for ((taskList, index) <- taskLists.zipWithIndex) { if (!taskList.isEmpty) { val offerNum = offerableIndices(index) + val slaveId = offers(offerNum).getSlaveId.getValue + slaveIdsWithExecutors += slaveId mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size) for (taskDesc <- taskList) { - taskIdToSlaveId(taskDesc.taskId) = offers(offerNum).getSlaveId.getValue - mesosTasks(offerNum).add(createMesosTask(taskDesc, offers(offerNum).getSlaveId)) + taskIdToSlaveId(taskDesc.taskId) = slaveId + mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId)) } } } @@ -214,7 +216,7 @@ class MesosScheduler( } /** Turn a Spark TaskDescription into a Mesos task */ - def createMesosTask(task: TaskDescription, slaveId: SlaveID): MesosTaskInfo = { + def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = { val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build() val cpuResource = Resource.newBuilder() .setName("cpus") @@ -223,7 +225,7 @@ class MesosScheduler( .build() return MesosTaskInfo.newBuilder() .setTaskId(taskId) - .setSlaveId(slaveId) + .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build()) .setExecutor(executorInfo) .setName(task.name) .addResources(cpuResource) |