diff options
author | Matei Zaharia <matei.zaharia@gmail.com> | 2013-08-14 16:17:23 -0700 |
---|---|---|
committer | Matei Zaharia <matei.zaharia@gmail.com> | 2013-08-14 16:17:23 -0700 |
commit | 839f2d4f3f7f39615c1c840b0d7c9394da6a2e64 (patch) | |
tree | 618a9d46c379f361b6e41528144be6a0032127dd /core | |
parent | 63446f9208876b482c2aea4acc42a6713eb94f55 (diff) | |
parent | 04ad78b09d195e52d2747c18fe2e3a4640abf838 (diff) | |
download | spark-839f2d4f3f7f39615c1c840b0d7c9394da6a2e64.tar.gz spark-839f2d4f3f7f39615c1c840b0d7c9394da6a2e64.tar.bz2 spark-839f2d4f3f7f39615c1c840b0d7c9394da6a2e64.zip |
Merge pull request #822 from pwendell/ui-features
Adding GC Stats to TaskMetrics (and three small fixes)
Diffstat (limited to 'core')
6 files changed, 54 insertions, 27 deletions
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index 8a74a8d853..05a960d7c5 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -17,18 +17,17 @@ package spark.executor -import java.io.{File, FileOutputStream} -import java.net.{URI, URL, URLClassLoader} +import java.io.{File} +import java.lang.management.ManagementFactory +import java.nio.ByteBuffer import java.util.concurrent._ -import org.apache.hadoop.fs.FileUtil - -import scala.collection.mutable.{ArrayBuffer, Map, HashMap} +import scala.collection.JavaConversions._ +import scala.collection.mutable.HashMap -import spark.broadcast._ import spark.scheduler._ import spark._ -import java.nio.ByteBuffer + /** * The Mesos executor for Spark. @@ -116,6 +115,9 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert context.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) var attemptedTask: Option[Task[Any]] = None var taskStart: Long = 0 + def getTotalGCTime = ManagementFactory.getGarbageCollectorMXBeans.map(g => g.getCollectionTime).sum + val startGCTime = getTotalGCTime + try { SparkEnv.set(env) Accumulators.clear() @@ -128,10 +130,11 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert taskStart = System.currentTimeMillis() val value = task.run(taskId.toInt) val taskFinish = System.currentTimeMillis() - task.metrics.foreach{ m => + for (m <- task.metrics) { m.hostname = Utils.localHostName m.executorDeserializeTime = (taskStart - startTime).toInt m.executorRunTime = (taskFinish - taskStart).toInt + m.jvmGCTime = getTotalGCTime - startGCTime } //TODO I'd also like to track the time it takes to serialize the task results, but that is huge headache, b/c // we need to serialize the task metrics first. If TaskMetrics had a custom serialized format, we could @@ -155,7 +158,10 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert case t: Throwable => { val serviceTime = (System.currentTimeMillis() - taskStart).toInt val metrics = attemptedTask.flatMap(t => t.metrics) - metrics.foreach{m => m.executorRunTime = serviceTime} + for (m <- metrics) { + m.executorRunTime = serviceTime + m.jvmGCTime = getTotalGCTime - startGCTime + } val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics) context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) diff --git a/core/src/main/scala/spark/executor/TaskMetrics.scala b/core/src/main/scala/spark/executor/TaskMetrics.scala index 3151627839..47b8890bee 100644 --- a/core/src/main/scala/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/spark/executor/TaskMetrics.scala @@ -31,7 +31,7 @@ class TaskMetrics extends Serializable { /** * Time the executor spends actually running the task (including fetching shuffle data) */ - var executorRunTime:Int = _ + var executorRunTime: Int = _ /** * The number of bytes this task transmitted back to the driver as the TaskResult @@ -39,6 +39,11 @@ class TaskMetrics extends Serializable { var resultSize: Long = _ /** + * Amount of time the JVM spent in garbage collection while executing this task + */ + var jvmGCTime: Long = _ + + /** * If this task reads from shuffle output, metrics on getting shuffle data will be collected here */ var shuffleReadMetrics: Option[ShuffleReadMetrics] = None diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala index f274b1a767..6c43928bc8 100644 --- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala @@ -18,8 +18,11 @@ package spark.scheduler.local import java.io.File +import java.lang.management.ManagementFactory import java.util.concurrent.atomic.AtomicInteger import java.nio.ByteBuffer + +import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet @@ -173,6 +176,9 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: var attemptedTask: Option[Task[_]] = None val start = System.currentTimeMillis() var taskStart: Long = 0 + def getTotalGCTime = ManagementFactory.getGarbageCollectorMXBeans.map(g => g.getCollectionTime).sum + val startGCTime = getTotalGCTime + try { Accumulators.clear() Thread.currentThread().setContextClassLoader(classLoader) @@ -202,6 +208,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: val serviceTime = System.currentTimeMillis() - taskStart logInfo("Finished " + taskId) deserializedTask.metrics.get.executorRunTime = serviceTime.toInt + deserializedTask.metrics.get.jvmGCTime = getTotalGCTime - startGCTime deserializedTask.metrics.get.executorDeserializeTime = deserTime.toInt val taskResult = new TaskResult(result, accumUpdates, deserializedTask.metrics.getOrElse(null)) val serializedResult = ser.serialize(taskResult) @@ -210,7 +217,10 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: case t: Throwable => { val serviceTime = System.currentTimeMillis() - taskStart val metrics = attemptedTask.flatMap(t => t.metrics) - metrics.foreach{m => m.executorRunTime = serviceTime.toInt} + for (m <- metrics) { + m.executorRunTime = serviceTime.toInt + m.jvmGCTime = getTotalGCTime - startGCTime + } val failure = new ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics) localActor ! LocalStatusUpdate(taskId, TaskState.FAILED, ser.serialize(failure)) } diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala index 1b071a91e5..f91a415e37 100644 --- a/core/src/main/scala/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala @@ -85,9 +85,10 @@ private[spark] class StagePage(parent: JobProgressUI) { Seq("Task ID", "Status", "Duration", "Locality Level", "Worker", "Launch Time") ++ {if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++ {if (hasShuffleWrite) Seq("Shuffle Write") else Nil} ++ + Seq("GC Time") ++ Seq("Details") - val taskTable = listingTable(taskHeaders, taskRow, tasks) + val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite), tasks) // Excludes tasks which failed and have incomplete metrics val validTasks = tasks.filter(t => t._1.status == "SUCCESS" && (t._2.isDefined)) @@ -135,7 +136,8 @@ private[spark] class StagePage(parent: JobProgressUI) { } - def taskRow(taskData: (TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])): Seq[Node] = { + def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean) + (taskData: (TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])): Seq[Node] = { def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] = trace.map(e => <span style="display:block;">{e.toString}</span>) val (info, metrics, exception) = taskData @@ -144,6 +146,7 @@ private[spark] class StagePage(parent: JobProgressUI) { else metrics.map(m => m.executorRunTime).getOrElse(1) val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration) else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("") + val gcTime = metrics.map(m => m.jvmGCTime).getOrElse(0L) <tr> <td>{info.taskId}</td> @@ -154,10 +157,17 @@ private[spark] class StagePage(parent: JobProgressUI) { <td>{info.taskLocality}</td> <td>{info.hostPort}</td> <td>{dateFmt.format(new Date(info.launchTime))}</td> - {metrics.flatMap{m => m.shuffleReadMetrics}.map{s => - <td>{Utils.memoryBytesToString(s.remoteBytesRead)}</td>}.getOrElse("")} - {metrics.flatMap{m => m.shuffleWriteMetrics}.map{s => - <td>{Utils.memoryBytesToString(s.shuffleBytesWritten)}</td>}.getOrElse("")} + {if (shuffleRead) { + <td>{metrics.flatMap{m => m.shuffleReadMetrics}.map{s => + Utils.memoryBytesToString(s.remoteBytesRead)}.getOrElse("")}</td> + }} + {if (shuffleWrite) { + <td>{metrics.flatMap{m => m.shuffleWriteMetrics}.map{s => + Utils.memoryBytesToString(s.shuffleBytesWritten)}.getOrElse("")}</td> + }} + <td sorttable_customkey={gcTime.toString}> + {if (gcTime > 0) parent.formatDuration(gcTime) else ""} + </td> <td>{exception.map(e => <span> {e.className} ({e.description})<br/> diff --git a/core/src/main/scala/spark/ui/jobs/StageTable.scala b/core/src/main/scala/spark/ui/jobs/StageTable.scala index 5068a025fa..19b07cceda 100644 --- a/core/src/main/scala/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/spark/ui/jobs/StageTable.scala @@ -49,13 +49,6 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU </table> } - private def getElapsedTime(submitted: Option[Long], completed: Long): String = { - submitted match { - case Some(t) => parent.formatDuration(completed - t) - case _ => "Unknown" - } - } - private def makeProgressBar(started: Int, completed: Int, failed: String, total: Int): Seq[Node] = { val completeWidth = "width: %s%%".format((completed.toDouble/total)*100) val startWidth = "width: %s%%".format((started.toDouble/total)*100) @@ -98,6 +91,8 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU val nameLink = <a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a> val description = listener.stageToDescription.get(s) .map(d => <div><em>{d}</em></div><div>{nameLink}</div>).getOrElse(nameLink) + val finishTime = s.completionTime.getOrElse(System.currentTimeMillis()) + val duration = s.submissionTime.map(t => finishTime - t) <tr> <td>{s.id}</td> @@ -106,8 +101,9 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU } <td>{description}</td> <td valign="middle">{submissionTime}</td> - <td>{getElapsedTime(s.submissionTime, - s.completionTime.getOrElse(System.currentTimeMillis()))}</td> + <td sorttable_customkey={duration.getOrElse(-1).toString}> + {duration.map(d => parent.formatDuration(d)).getOrElse("Unknown")} + </td> <td class="progress-cell"> {makeProgressBar(startedTasks, completedTasks, failedTasks, totalTasks)} </td> diff --git a/core/src/main/scala/spark/ui/storage/RDDPage.scala b/core/src/main/scala/spark/ui/storage/RDDPage.scala index 4c3ee12c98..40f94b42a6 100644 --- a/core/src/main/scala/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/spark/ui/storage/RDDPage.scala @@ -123,7 +123,7 @@ private[spark] class RDDPage(parent: BlockManagerUI) { <td>{status.blockManagerId.host + ":" + status.blockManagerId.port}</td> <td> {Utils.memoryBytesToString(status.memUsed(prefix))} - ({Utils.memoryBytesToString(status.memRemaining)} Total Available) + ({Utils.memoryBytesToString(status.memRemaining)} Remaining) </td> <td>{Utils.memoryBytesToString(status.diskUsed(prefix))}</td> </tr> |