aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-07-16 11:50:49 -0700
committerReynold Xin <rxin@apache.org>2014-07-16 11:50:49 -0700
commit7c8d123225bbdcc605642099b107c2d843e87340 (patch)
treecf5e86a5cb1a375b8baa8a5a35299bbedab59692 /core
parentcaa163f0868776d91697a9041528e382a789f0c3 (diff)
downloadspark-7c8d123225bbdcc605642099b107c2d843e87340.tar.gz
spark-7c8d123225bbdcc605642099b107c2d843e87340.tar.bz2
spark-7c8d123225bbdcc605642099b107c2d843e87340.zip
[SPARK-2317] Improve task logging.
We use TID to indicate task logging. However, TID itself does not capture stage or retries, making it harder to correlate with the application itself. This pull request changes all logging messages for tasks to include both the TID and the stage id, stage attempt, task id, and task attempt. I've consulted various people but unfortunately this is a really hard task. Driver log looks like: ``` 14/06/28 18:53:29 INFO DAGScheduler: Submitting 10 missing tasks from Stage 0 (MappedRDD[1] at map at <console>:13) 14/06/28 18:53:29 INFO TaskSchedulerImpl: Adding task set 0.0 with 10 tasks 14/06/28 18:53:29 INFO TaskSetManager: Re-computing pending task lists. 14/07/15 19:44:40 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 0, localhost, PROCESS_LOCAL, 1855 bytes) 14/07/15 19:44:40 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 1, localhost, PROCESS_LOCAL, 1855 bytes) 14/07/15 19:44:40 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 2, localhost, PROCESS_LOCAL, 1855 bytes) 14/07/15 19:44:40 INFO TaskSetManager: Starting task 3.0 in stage 1.0 (TID 3, localhost, PROCESS_LOCAL, 1855 bytes) 14/07/15 19:44:40 INFO TaskSetManager: Starting task 4.0 in stage 1.0 (TID 4, localhost, PROCESS_LOCAL, 1855 bytes) 14/07/15 19:44:40 INFO TaskSetManager: Starting task 5.0 in stage 1.0 (TID 5, localhost, PROCESS_LOCAL, 1855 bytes) 14/07/15 19:44:40 INFO TaskSetManager: Starting task 6.0 in stage 1.0 (TID 6, localhost, PROCESS_LOCAL, 1855 bytes) ... 14/07/15 19:44:40 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 1) in 64 ms on localhost (4/10) 14/07/15 19:44:40 INFO TaskSetManager: Finished task 4.0 in stage 1.0 (TID 4) in 63 ms on localhost (5/10) 14/07/15 19:44:40 INFO TaskSetManager: Finished task 2.0 in stage 1.0 (TID 2) in 63 ms on localhost (6/10) 14/07/15 19:44:40 INFO TaskSetManager: Finished task 7.0 in stage 1.0 (TID 7) in 62 ms on localhost (7/10) 14/07/15 19:44:40 INFO TaskSetManager: Finished task 6.0 in stage 1.0 (TID 6) in 63 ms on localhost (8/10) 14/07/15 19:44:40 INFO TaskSetManager: Finished task 9.0 in stage 1.0 (TID 9) in 8 ms on localhost (9/10) 14/07/15 19:44:40 INFO TaskSetManager: Finished task 8.0 in stage 1.0 (TID 8) in 9 ms on localhost (10/10) ``` Executor log looks like ``` 14/07/15 19:44:40 INFO Executor: Running task 0.0 in stage 1.0 (TID 0) 14/07/15 19:44:40 INFO Executor: Running task 3.0 in stage 1.0 (TID 3) 14/07/15 19:44:40 INFO Executor: Running task 1.0 in stage 1.0 (TID 1) 14/07/15 19:44:40 INFO Executor: Running task 4.0 in stage 1.0 (TID 4) 14/07/15 19:44:40 INFO Executor: Running task 2.0 in stage 1.0 (TID 2) 14/07/15 19:44:40 INFO Executor: Running task 5.0 in stage 1.0 (TID 5) 14/07/15 19:44:40 INFO Executor: Running task 6.0 in stage 1.0 (TID 6) 14/07/15 19:44:40 INFO Executor: Running task 7.0 in stage 1.0 (TID 7) 14/07/15 19:44:40 INFO Executor: Finished task 3.0 in stage 1.0 (TID 3). 847 bytes result sent to driver 14/07/15 19:44:40 INFO Executor: Finished task 2.0 in stage 1.0 (TID 2). 847 bytes result sent to driver 14/07/15 19:44:40 INFO Executor: Finished task 0.0 in stage 1.0 (TID 0). 847 bytes result sent to driver 14/07/15 19:44:40 INFO Executor: Finished task 1.0 in stage 1.0 (TID 1). 847 bytes result sent to driver 14/07/15 19:44:40 INFO Executor: Finished task 5.0 in stage 1.0 (TID 5). 847 bytes result sent to driver 14/07/15 19:44:40 INFO Executor: Finished task 4.0 in stage 1.0 (TID 4). 847 bytes result sent to driver 14/07/15 19:44:40 INFO Executor: Finished task 6.0 in stage 1.0 (TID 6). 847 bytes result sent to driver 14/07/15 19:44:40 INFO Executor: Finished task 7.0 in stage 1.0 (TID 7). 847 bytes result sent to driver ``` Author: Reynold Xin <rxin@apache.org> Closes #1259 from rxin/betterTaskLogging and squashes the following commits: c28ada1 [Reynold Xin] Fix unit test failure. 987d043 [Reynold Xin] Updated log messages. c6cfd46 [Reynold Xin] Merge branch 'master' into betterTaskLogging b7b1bcc [Reynold Xin] Fixed a typo. f9aba3c [Reynold Xin] Made it compile. f8a5c06 [Reynold Xin] Merge branch 'master' into betterTaskLogging 07264e6 [Reynold Xin] Defensive check against unknown TaskEndReason. 76bbd18 [Reynold Xin] FailureSuite not serializable reporting. 4659b20 [Reynold Xin] Remove unused variable. 53888e3 [Reynold Xin] [SPARK-2317] Improve task logging.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/TaskEndReason.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala43
-rw-r--r--core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala90
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/FailureSuite.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/ShuffleSuite.scala2
10 files changed, 78 insertions, 76 deletions
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index df42d679b4..8d5c45627f 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -89,8 +89,9 @@ case class ExceptionFailure(
metrics: Option[TaskMetrics])
extends TaskFailedReason {
override def toErrorString: String = {
- val stackTraceString = if (stackTrace == null) "null" else stackTrace.mkString("\n")
- s"$className ($description}\n$stackTraceString"
+ val stackTraceString =
+ if (stackTrace == null) "null" else stackTrace.map(" " + _).mkString("\n")
+ s"$className ($description)\n$stackTraceString"
}
}
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 8d31bd05fd..b455c9fcf4 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -71,7 +71,7 @@ private[spark] class CoarseGrainedExecutorBackend(
val ser = SparkEnv.get.closureSerializer.newInstance()
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
- executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
+ executor.launchTask(this, taskDesc.taskId, taskDesc.name, taskDesc.serializedTask)
}
case KillTask(taskId, _, interruptThread) =>
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 4d3ba11633..b16133b20c 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -107,8 +107,9 @@ private[spark] class Executor(
// Maintains the list of running tasks.
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
- def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
- val tr = new TaskRunner(context, taskId, serializedTask)
+ def launchTask(
+ context: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer) {
+ val tr = new TaskRunner(context, taskId, taskName, serializedTask)
runningTasks.put(taskId, tr)
threadPool.execute(tr)
}
@@ -135,14 +136,15 @@ private[spark] class Executor(
localDirs
}
- class TaskRunner(execBackend: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer)
+ class TaskRunner(
+ execBackend: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer)
extends Runnable {
@volatile private var killed = false
@volatile private var task: Task[Any] = _
def kill(interruptThread: Boolean) {
- logInfo("Executor is trying to kill task " + taskId)
+ logInfo(s"Executor is trying to kill $taskName (TID $taskId)")
killed = true
if (task != null) {
task.kill(interruptThread)
@@ -154,7 +156,7 @@ private[spark] class Executor(
SparkEnv.set(env)
Thread.currentThread.setContextClassLoader(replClassLoader)
val ser = SparkEnv.get.closureSerializer.newInstance()
- logInfo("Running task ID " + taskId)
+ logInfo(s"Running $taskName (TID $taskId)")
execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
var attemptedTask: Option[Task[Any]] = None
var taskStart: Long = 0
@@ -207,25 +209,30 @@ private[spark] class Executor(
val accumUpdates = Accumulators.values
- val directResult = new DirectTaskResult(valueBytes, accumUpdates,
- task.metrics.getOrElse(null))
+ val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull)
val serializedDirectResult = ser.serialize(directResult)
- logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit)
- val serializedResult = {
- if (serializedDirectResult.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
- logInfo("Storing result for " + taskId + " in local BlockManager")
+ val resultSize = serializedDirectResult.limit
+
+ // directSend = sending directly back to the driver
+ val (serializedResult, directSend) = {
+ if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(
blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
- ser.serialize(new IndirectTaskResult[Any](blockId))
+ (ser.serialize(new IndirectTaskResult[Any](blockId)), false)
} else {
- logInfo("Sending result for " + taskId + " directly to driver")
- serializedDirectResult
+ (serializedDirectResult, true)
}
}
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
- logInfo("Finished task ID " + taskId)
+
+ if (directSend) {
+ logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
+ } else {
+ logInfo(
+ s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
+ }
} catch {
case ffe: FetchFailedException => {
val reason = ffe.toTaskEndReason
@@ -233,7 +240,7 @@ private[spark] class Executor(
}
case _: TaskKilledException | _: InterruptedException if task.killed => {
- logInfo("Executor killed task " + taskId)
+ logInfo(s"Executor killed $taskName (TID $taskId)")
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
}
@@ -241,7 +248,7 @@ private[spark] class Executor(
// Attempt to exit cleanly by informing the driver of our failure.
// If anything goes wrong (or this was a fatal exception), we will delegate to
// the default uncaught exception handler, which will terminate the Executor.
- logError("Exception in task ID " + taskId, t)
+ logError(s"Exception in $taskName (TID $taskId)", t)
val serviceTime = System.currentTimeMillis() - taskStart
val metrics = attemptedTask.flatMap(t => t.metrics)
@@ -249,7 +256,7 @@ private[spark] class Executor(
m.executorRunTime = serviceTime
m.jvmGCTime = gcTime - startGCTime
}
- val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
+ val reason = ExceptionFailure(t.getClass.getName, t.getMessage, t.getStackTrace, metrics)
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
// Don't forcibly exit unless the exception was inherently fatal, to avoid
diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
index 2232e6237b..a42c8b43bb 100644
--- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
@@ -64,7 +64,7 @@ private[spark] class MesosExecutorBackend
if (executor == null) {
logError("Received launchTask but executor was null")
} else {
- executor.launchTask(this, taskId, taskInfo.getData.asReadOnlyByteBuffer)
+ executor.launchTask(this, taskId, taskInfo.getName, taskInfo.getData.asReadOnlyByteBuffer)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index f72bfde572..ede3c7d9f0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -816,7 +816,6 @@ class DAGScheduler(
}
event.reason match {
case Success =>
- logInfo("Completed " + task)
if (event.accumUpdates != null) {
// TODO: fail the stage if the accumulator update fails...
Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index 29de0453ac..ca0595f351 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -84,6 +84,8 @@ class TaskInfo(
}
}
+ def id: String = s"$index.$attempt"
+
def duration: Long = {
if (!finished) {
throw new UnsupportedOperationException("duration() called on unfinished task")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 3bdc71d93b..8b5e8cb802 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -26,8 +26,7 @@ import scala.collection.mutable.HashSet
import scala.math.max
import scala.math.min
-import org.apache.spark.{ExceptionFailure, ExecutorLostFailure, FetchFailed, Logging, Resubmitted,
- SparkEnv, Success, TaskEndReason, TaskKilled, TaskResultLost, TaskState}
+import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.util.{Clock, SystemClock}
@@ -52,8 +51,8 @@ private[spark] class TaskSetManager(
val taskSet: TaskSet,
val maxTaskFailures: Int,
clock: Clock = SystemClock)
- extends Schedulable with Logging
-{
+ extends Schedulable with Logging {
+
val conf = sched.sc.conf
/*
@@ -403,14 +402,11 @@ private[spark] class TaskSetManager(
// Found a task; do some bookkeeping and return a task description
val task = tasks(index)
val taskId = sched.newTaskId()
- // Figure out whether this should count as a preferred launch
- logInfo("Starting task %s:%d as TID %s on executor %s: %s (%s)".format(
- taskSet.id, index, taskId, execId, host, taskLocality))
// Do various bookkeeping
copiesRunning(index) += 1
val attemptNum = taskAttempts(index).size
- val info = new TaskInfo(
- taskId, index, attemptNum + 1, curTime, execId, host, taskLocality, speculative)
+ val info = new TaskInfo(taskId, index, attemptNum, curTime,
+ execId, host, taskLocality, speculative)
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
// Update our locality level for delay scheduling
@@ -429,11 +425,15 @@ private[spark] class TaskSetManager(
s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
}
- val timeTaken = clock.getTime() - startTime
addRunningTask(taskId)
- logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
- taskSet.id, index, serializedTask.limit, timeTaken))
- val taskName = "task %s:%d".format(taskSet.id, index)
+
+ // We used to log the time it takes to serialize the task, but task size is already
+ // a good proxy to task serialization time.
+ // val timeTaken = clock.getTime() - startTime
+ val taskName = s"task ${info.id} in stage ${taskSet.id}"
+ logInfo("Starting %s (TID %d, %s, %s, %d bytes)".format(
+ taskName, taskId, host, taskLocality, serializedTask.limit))
+
sched.dagScheduler.taskStarted(task, info)
return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask))
}
@@ -492,19 +492,19 @@ private[spark] class TaskSetManager(
info.markSuccessful()
removeRunningTask(tid)
sched.dagScheduler.taskEnded(
- tasks(index), Success, result.value, result.accumUpdates, info, result.metrics)
+ tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics)
if (!successful(index)) {
tasksSuccessful += 1
- logInfo("Finished TID %s in %d ms on %s (progress: %d/%d)".format(
- tid, info.duration, info.host, tasksSuccessful, numTasks))
+ logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format(
+ info.id, taskSet.id, info.taskId, info.duration, info.host, tasksSuccessful, numTasks))
// Mark successful and stop if all the tasks have succeeded.
successful(index) = true
if (tasksSuccessful == numTasks) {
isZombie = true
}
} else {
- logInfo("Ignorning task-finished event for TID " + tid + " because task " +
- index + " has already completed successfully")
+ logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
+ " because task " + index + " has already completed successfully")
}
failedExecutors.remove(index)
maybeFinishTaskSet()
@@ -523,14 +523,13 @@ private[spark] class TaskSetManager(
info.markFailed()
val index = info.index
copiesRunning(index) -= 1
- if (!isZombie) {
- logWarning("Lost TID %s (task %s:%d)".format(tid, taskSet.id, index))
- }
var taskMetrics : TaskMetrics = null
- var failureReason: String = null
+
+ val failureReason = s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid, ${info.host}): " +
+ reason.asInstanceOf[TaskFailedReason].toErrorString
reason match {
case fetchFailed: FetchFailed =>
- logWarning("Loss was due to fetch failure from " + fetchFailed.bmAddress)
+ logWarning(failureReason)
if (!successful(index)) {
successful(index) = true
tasksSuccessful += 1
@@ -538,23 +537,17 @@ private[spark] class TaskSetManager(
// Not adding to failed executors for FetchFailed.
isZombie = true
- case TaskKilled =>
- // Not adding to failed executors for TaskKilled.
- logWarning("Task %d was killed.".format(tid))
-
case ef: ExceptionFailure =>
- taskMetrics = ef.metrics.getOrElse(null)
- if (ef.className == classOf[NotSerializableException].getName()) {
+ taskMetrics = ef.metrics.orNull
+ if (ef.className == classOf[NotSerializableException].getName) {
// If the task result wasn't serializable, there's no point in trying to re-execute it.
- logError("Task %s:%s had a not serializable result: %s; not retrying".format(
- taskSet.id, index, ef.description))
- abort("Task %s:%s had a not serializable result: %s".format(
- taskSet.id, index, ef.description))
+ logError("Task %s in stage %s (TID %d) had a not serializable result: %s; not retrying"
+ .format(info.id, taskSet.id, tid, ef.description))
+ abort("Task %s in stage %s (TID %d) had a not serializable result: %s".format(
+ info.id, taskSet.id, tid, ef.description))
return
}
val key = ef.description
- failureReason = "Exception failure in TID %s on host %s: %s\n%s".format(
- tid, info.host, ef.description, ef.stackTrace.map(" " + _).mkString("\n"))
val now = clock.getTime()
val (printFull, dupCount) = {
if (recentExceptions.contains(key)) {
@@ -572,19 +565,18 @@ private[spark] class TaskSetManager(
}
}
if (printFull) {
- val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString))
- logWarning("Loss was due to %s\n%s\n%s".format(
- ef.className, ef.description, locs.mkString("\n")))
+ logWarning(failureReason)
} else {
- logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount))
+ logInfo(
+ s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid) on executor ${info.host}: " +
+ s"${ef.className} (${ef.description}) [duplicate $dupCount]")
}
- case TaskResultLost =>
- failureReason = "Lost result for TID %s on host %s".format(tid, info.host)
+ case e: TaskFailedReason => // TaskResultLost, TaskKilled, and others
logWarning(failureReason)
- case _ =>
- failureReason = "TID %s on host %s failed for unknown reason".format(tid, info.host)
+ case e: TaskEndReason =>
+ logError("Unknown TaskEndReason: " + e)
}
// always add to failed executors
failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()).
@@ -595,10 +587,10 @@ private[spark] class TaskSetManager(
assert (null != failureReason)
numFailures(index) += 1
if (numFailures(index) >= maxTaskFailures) {
- logError("Task %s:%d failed %d times; aborting job".format(
- taskSet.id, index, maxTaskFailures))
- abort("Task %s:%d failed %d times, most recent failure: %s\nDriver stacktrace:".format(
- taskSet.id, index, maxTaskFailures, failureReason))
+ logError("Task %d in stage %s failed %d times; aborting job".format(
+ index, taskSet.id, maxTaskFailures))
+ abort("Task %d in stage %s failed %d times, most recent failure: %s\nDriver stacktrace:"
+ .format(index, taskSet.id, maxTaskFailures, failureReason))
return
}
}
@@ -711,8 +703,8 @@ private[spark] class TaskSetManager(
if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold &&
!speculatableTasks.contains(index)) {
logInfo(
- "Marking task %s:%d (on %s) as speculatable because it ran more than %.0f ms".format(
- taskSet.id, index, info.host, threshold))
+ "Marking task %d in stage %s (on %s) as speculatable because it ran more than %.0f ms"
+ .format(index, taskSet.id, info.host, threshold))
speculatableTasks += index
foundTasks = true
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 9b95ccca04..e9f6273bfd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -69,7 +69,7 @@ private[spark] class LocalActor(
val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
for (task <- scheduler.resourceOffers(offers).flatten) {
freeCores -= 1
- executor.launchTask(executorBackend, task.taskId, task.serializedTask)
+ executor.launchTask(executorBackend, task.taskId, task.name, task.serializedTask)
}
}
}
diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala
index e755d2e309..2229e6acc4 100644
--- a/core/src/test/scala/org/apache/spark/FailureSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala
@@ -104,8 +104,9 @@ class FailureSuite extends FunSuite with LocalSparkContext {
results.collect()
}
assert(thrown.getClass === classOf[SparkException])
- assert(thrown.getMessage.contains("NotSerializableException") ||
- thrown.getCause.getClass === classOf[NotSerializableException])
+ assert(thrown.getMessage.contains("serializable") ||
+ thrown.getCause.getClass === classOf[NotSerializableException],
+ "Exception does not contain \"serializable\": " + thrown.getMessage)
FailureSuiteState.clear()
}
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index c4f2f7e34f..237e644b48 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -240,7 +240,7 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
}
assert(thrown.getClass === classOf[SparkException])
- assert(thrown.getMessage.contains("NotSerializableException"))
+ assert(thrown.getMessage.toLowerCase.contains("serializable"))
}
}