aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-07-06 16:33:40 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-07-06 16:33:40 -0700
commit4e2fe0bdaf7c2626d8b8461fed36259c9830a25c (patch)
tree1d0a4253850285a3d7ceabe3b59f4af32862c925
parente72afdb817bcc8388aeb8b8d31628fd5fd67acf1 (diff)
downloadspark-4e2fe0bdaf7c2626d8b8461fed36259c9830a25c.tar.gz
spark-4e2fe0bdaf7c2626d8b8461fed36259c9830a25c.tar.bz2
spark-4e2fe0bdaf7c2626d8b8461fed36259c9830a25c.zip
Miscellaneous bug fixes
-rw-r--r--core/src/main/scala/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala8
-rw-r--r--core/src/main/scala/spark/executor/MesosExecutorRunner.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala1
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala10
5 files changed, 15 insertions, 9 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index d35b2b1cac..2e8cb609b2 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -63,7 +63,7 @@ class SparkContext(
System.setProperty("spark.master.port", "0")
}
- private val isLocal = master.startsWith("local") // TODO: better check for local
+ private val isLocal = (master == "local" || master.startsWith("local["))
// Create the Spark execution environment (cache, map output tracker, etc)
val env = SparkEnv.createFromSystemProperties(
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index ad02b85254..ac30ae9aec 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -69,17 +69,19 @@ class Executor extends Logging {
val value = task.run(taskId.toInt)
val accumUpdates = Accumulators.values
val result = new TaskResult(value, accumUpdates)
- context.statusUpdate(taskId, TaskState.FINISHED, ser.serialize(result))
+ val serializedResult = ser.serialize(result)
+ logInfo("Serialized size of result for " + taskId + " is " + serializedResult.limit)
+ context.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
logInfo("Finished task ID " + taskId)
} catch {
case ffe: FetchFailedException => {
val reason = ffe.toTaskEndReason
- context.statusUpdate(taskId, TaskState.FINISHED, ser.serialize(reason))
+ context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
}
case t: Throwable => {
val reason = ExceptionFailure(t)
- context.statusUpdate(taskId, TaskState.FINISHED, ser.serialize(reason))
+ context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
// TODO: Should we exit the whole executor here? On the one hand, the failed task may
// have left some weird state around depending on when the exception was thrown, but on
diff --git a/core/src/main/scala/spark/executor/MesosExecutorRunner.scala b/core/src/main/scala/spark/executor/MesosExecutorRunner.scala
index 7695cbdfd7..f97d9d0bfa 100644
--- a/core/src/main/scala/spark/executor/MesosExecutorRunner.scala
+++ b/core/src/main/scala/spark/executor/MesosExecutorRunner.scala
@@ -6,6 +6,7 @@ import org.apache.mesos.Protos.{TaskState => MesosTaskState, TaskStatus => Mesos
import spark.TaskState.TaskState
import com.google.protobuf.ByteString
import spark.{Utils, Logging}
+import spark.TaskState
class MesosExecutorRunner(executor: Executor)
extends MesosExecutor
@@ -18,7 +19,7 @@ class MesosExecutorRunner(executor: Executor)
val mesosTaskId = TaskID.newBuilder().setValue(taskId.toString).build()
driver.sendStatusUpdate(MesosTaskStatus.newBuilder()
.setTaskId(mesosTaskId)
- .setState(MesosTaskState.TASK_FINISHED)
+ .setState(TaskState.toMesos(state))
.setData(ByteString.copyFrom(data))
.build())
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index 75b67a0eb4..ab07f1c8c2 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -227,6 +227,7 @@ class TaskSetManager(
}
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
+ logInfo("statusUpdate: " + tid + " is now " + state + " " + serializedData)
state match {
case TaskState.FINISHED =>
taskFinished(tid, state, serializedData)
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala b/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala
index 8131d84fdf..f5c35becda 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala
@@ -188,10 +188,12 @@ class MesosScheduler(
for ((taskList, index) <- taskLists.zipWithIndex) {
if (!taskList.isEmpty) {
val offerNum = offerableIndices(index)
+ val slaveId = offers(offerNum).getSlaveId.getValue
+ slaveIdsWithExecutors += slaveId
mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size)
for (taskDesc <- taskList) {
- taskIdToSlaveId(taskDesc.taskId) = offers(offerNum).getSlaveId.getValue
- mesosTasks(offerNum).add(createMesosTask(taskDesc, offers(offerNum).getSlaveId))
+ taskIdToSlaveId(taskDesc.taskId) = slaveId
+ mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId))
}
}
}
@@ -214,7 +216,7 @@ class MesosScheduler(
}
/** Turn a Spark TaskDescription into a Mesos task */
- def createMesosTask(task: TaskDescription, slaveId: SlaveID): MesosTaskInfo = {
+ def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = {
val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build()
val cpuResource = Resource.newBuilder()
.setName("cpus")
@@ -223,7 +225,7 @@ class MesosScheduler(
.build()
return MesosTaskInfo.newBuilder()
.setTaskId(taskId)
- .setSlaveId(slaveId)
+ .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
.setExecutor(executorInfo)
.setName(task.name)
.addResources(cpuResource)