aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei.zaharia@gmail.com>2013-08-14 16:17:23 -0700
committerMatei Zaharia <matei.zaharia@gmail.com>2013-08-14 16:17:23 -0700
commit839f2d4f3f7f39615c1c840b0d7c9394da6a2e64 (patch)
tree618a9d46c379f361b6e41528144be6a0032127dd /core
parent63446f9208876b482c2aea4acc42a6713eb94f55 (diff)
parent04ad78b09d195e52d2747c18fe2e3a4640abf838 (diff)
downloadspark-839f2d4f3f7f39615c1c840b0d7c9394da6a2e64.tar.gz
spark-839f2d4f3f7f39615c1c840b0d7c9394da6a2e64.tar.bz2
spark-839f2d4f3f7f39615c1c840b0d7c9394da6a2e64.zip
Merge pull request #822 from pwendell/ui-features
Adding GC Stats to TaskMetrics (and three small fixes)
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala24
-rw-r--r--core/src/main/scala/spark/executor/TaskMetrics.scala7
-rw-r--r--core/src/main/scala/spark/scheduler/local/LocalScheduler.scala12
-rw-r--r--core/src/main/scala/spark/ui/jobs/StagePage.scala22
-rw-r--r--core/src/main/scala/spark/ui/jobs/StageTable.scala14
-rw-r--r--core/src/main/scala/spark/ui/storage/RDDPage.scala2
6 files changed, 54 insertions, 27 deletions
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 8a74a8d853..05a960d7c5 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -17,18 +17,17 @@
package spark.executor
-import java.io.{File, FileOutputStream}
-import java.net.{URI, URL, URLClassLoader}
+import java.io.{File}
+import java.lang.management.ManagementFactory
+import java.nio.ByteBuffer
import java.util.concurrent._
-import org.apache.hadoop.fs.FileUtil
-
-import scala.collection.mutable.{ArrayBuffer, Map, HashMap}
+import scala.collection.JavaConversions._
+import scala.collection.mutable.HashMap
-import spark.broadcast._
import spark.scheduler._
import spark._
-import java.nio.ByteBuffer
+
/**
* The Mesos executor for Spark.
@@ -116,6 +115,9 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
context.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
var attemptedTask: Option[Task[Any]] = None
var taskStart: Long = 0
+ def getTotalGCTime = ManagementFactory.getGarbageCollectorMXBeans.map(g => g.getCollectionTime).sum
+ val startGCTime = getTotalGCTime
+
try {
SparkEnv.set(env)
Accumulators.clear()
@@ -128,10 +130,11 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
taskStart = System.currentTimeMillis()
val value = task.run(taskId.toInt)
val taskFinish = System.currentTimeMillis()
- task.metrics.foreach{ m =>
+ for (m <- task.metrics) {
m.hostname = Utils.localHostName
m.executorDeserializeTime = (taskStart - startTime).toInt
m.executorRunTime = (taskFinish - taskStart).toInt
+ m.jvmGCTime = getTotalGCTime - startGCTime
}
//TODO I'd also like to track the time it takes to serialize the task results, but that is huge headache, b/c
// we need to serialize the task metrics first. If TaskMetrics had a custom serialized format, we could
@@ -155,7 +158,10 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
case t: Throwable => {
val serviceTime = (System.currentTimeMillis() - taskStart).toInt
val metrics = attemptedTask.flatMap(t => t.metrics)
- metrics.foreach{m => m.executorRunTime = serviceTime}
+ for (m <- metrics) {
+ m.executorRunTime = serviceTime
+ m.jvmGCTime = getTotalGCTime - startGCTime
+ }
val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
diff --git a/core/src/main/scala/spark/executor/TaskMetrics.scala b/core/src/main/scala/spark/executor/TaskMetrics.scala
index 3151627839..47b8890bee 100644
--- a/core/src/main/scala/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/spark/executor/TaskMetrics.scala
@@ -31,7 +31,7 @@ class TaskMetrics extends Serializable {
/**
* Time the executor spends actually running the task (including fetching shuffle data)
*/
- var executorRunTime:Int = _
+ var executorRunTime: Int = _
/**
* The number of bytes this task transmitted back to the driver as the TaskResult
@@ -39,6 +39,11 @@ class TaskMetrics extends Serializable {
var resultSize: Long = _
/**
+ * Amount of time the JVM spent in garbage collection while executing this task
+ */
+ var jvmGCTime: Long = _
+
+ /**
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here
*/
var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
index f274b1a767..6c43928bc8 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
@@ -18,8 +18,11 @@
package spark.scheduler.local
import java.io.File
+import java.lang.management.ManagementFactory
import java.util.concurrent.atomic.AtomicInteger
import java.nio.ByteBuffer
+
+import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
@@ -173,6 +176,9 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
var attemptedTask: Option[Task[_]] = None
val start = System.currentTimeMillis()
var taskStart: Long = 0
+ def getTotalGCTime = ManagementFactory.getGarbageCollectorMXBeans.map(g => g.getCollectionTime).sum
+ val startGCTime = getTotalGCTime
+
try {
Accumulators.clear()
Thread.currentThread().setContextClassLoader(classLoader)
@@ -202,6 +208,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
val serviceTime = System.currentTimeMillis() - taskStart
logInfo("Finished " + taskId)
deserializedTask.metrics.get.executorRunTime = serviceTime.toInt
+ deserializedTask.metrics.get.jvmGCTime = getTotalGCTime - startGCTime
deserializedTask.metrics.get.executorDeserializeTime = deserTime.toInt
val taskResult = new TaskResult(result, accumUpdates, deserializedTask.metrics.getOrElse(null))
val serializedResult = ser.serialize(taskResult)
@@ -210,7 +217,10 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
case t: Throwable => {
val serviceTime = System.currentTimeMillis() - taskStart
val metrics = attemptedTask.flatMap(t => t.metrics)
- metrics.foreach{m => m.executorRunTime = serviceTime.toInt}
+ for (m <- metrics) {
+ m.executorRunTime = serviceTime.toInt
+ m.jvmGCTime = getTotalGCTime - startGCTime
+ }
val failure = new ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
localActor ! LocalStatusUpdate(taskId, TaskState.FAILED, ser.serialize(failure))
}
diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala
index 1b071a91e5..f91a415e37 100644
--- a/core/src/main/scala/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala
@@ -85,9 +85,10 @@ private[spark] class StagePage(parent: JobProgressUI) {
Seq("Task ID", "Status", "Duration", "Locality Level", "Worker", "Launch Time") ++
{if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
{if (hasShuffleWrite) Seq("Shuffle Write") else Nil} ++
+ Seq("GC Time") ++
Seq("Details")
- val taskTable = listingTable(taskHeaders, taskRow, tasks)
+ val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite), tasks)
// Excludes tasks which failed and have incomplete metrics
val validTasks = tasks.filter(t => t._1.status == "SUCCESS" && (t._2.isDefined))
@@ -135,7 +136,8 @@ private[spark] class StagePage(parent: JobProgressUI) {
}
- def taskRow(taskData: (TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])): Seq[Node] = {
+ def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean)
+ (taskData: (TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])): Seq[Node] = {
def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] =
trace.map(e => <span style="display:block;">{e.toString}</span>)
val (info, metrics, exception) = taskData
@@ -144,6 +146,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
else metrics.map(m => m.executorRunTime).getOrElse(1)
val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration)
else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("")
+ val gcTime = metrics.map(m => m.jvmGCTime).getOrElse(0L)
<tr>
<td>{info.taskId}</td>
@@ -154,10 +157,17 @@ private[spark] class StagePage(parent: JobProgressUI) {
<td>{info.taskLocality}</td>
<td>{info.hostPort}</td>
<td>{dateFmt.format(new Date(info.launchTime))}</td>
- {metrics.flatMap{m => m.shuffleReadMetrics}.map{s =>
- <td>{Utils.memoryBytesToString(s.remoteBytesRead)}</td>}.getOrElse("")}
- {metrics.flatMap{m => m.shuffleWriteMetrics}.map{s =>
- <td>{Utils.memoryBytesToString(s.shuffleBytesWritten)}</td>}.getOrElse("")}
+ {if (shuffleRead) {
+ <td>{metrics.flatMap{m => m.shuffleReadMetrics}.map{s =>
+ Utils.memoryBytesToString(s.remoteBytesRead)}.getOrElse("")}</td>
+ }}
+ {if (shuffleWrite) {
+ <td>{metrics.flatMap{m => m.shuffleWriteMetrics}.map{s =>
+ Utils.memoryBytesToString(s.shuffleBytesWritten)}.getOrElse("")}</td>
+ }}
+ <td sorttable_customkey={gcTime.toString}>
+ {if (gcTime > 0) parent.formatDuration(gcTime) else ""}
+ </td>
<td>{exception.map(e =>
<span>
{e.className} ({e.description})<br/>
diff --git a/core/src/main/scala/spark/ui/jobs/StageTable.scala b/core/src/main/scala/spark/ui/jobs/StageTable.scala
index 5068a025fa..19b07cceda 100644
--- a/core/src/main/scala/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/spark/ui/jobs/StageTable.scala
@@ -49,13 +49,6 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU
</table>
}
- private def getElapsedTime(submitted: Option[Long], completed: Long): String = {
- submitted match {
- case Some(t) => parent.formatDuration(completed - t)
- case _ => "Unknown"
- }
- }
-
private def makeProgressBar(started: Int, completed: Int, failed: String, total: Int): Seq[Node] = {
val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
val startWidth = "width: %s%%".format((started.toDouble/total)*100)
@@ -98,6 +91,8 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU
val nameLink = <a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a>
val description = listener.stageToDescription.get(s)
.map(d => <div><em>{d}</em></div><div>{nameLink}</div>).getOrElse(nameLink)
+ val finishTime = s.completionTime.getOrElse(System.currentTimeMillis())
+ val duration = s.submissionTime.map(t => finishTime - t)
<tr>
<td>{s.id}</td>
@@ -106,8 +101,9 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU
}
<td>{description}</td>
<td valign="middle">{submissionTime}</td>
- <td>{getElapsedTime(s.submissionTime,
- s.completionTime.getOrElse(System.currentTimeMillis()))}</td>
+ <td sorttable_customkey={duration.getOrElse(-1).toString}>
+ {duration.map(d => parent.formatDuration(d)).getOrElse("Unknown")}
+ </td>
<td class="progress-cell">
{makeProgressBar(startedTasks, completedTasks, failedTasks, totalTasks)}
</td>
diff --git a/core/src/main/scala/spark/ui/storage/RDDPage.scala b/core/src/main/scala/spark/ui/storage/RDDPage.scala
index 4c3ee12c98..40f94b42a6 100644
--- a/core/src/main/scala/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/spark/ui/storage/RDDPage.scala
@@ -123,7 +123,7 @@ private[spark] class RDDPage(parent: BlockManagerUI) {
<td>{status.blockManagerId.host + ":" + status.blockManagerId.port}</td>
<td>
{Utils.memoryBytesToString(status.memUsed(prefix))}
- ({Utils.memoryBytesToString(status.memRemaining)} Total Available)
+ ({Utils.memoryBytesToString(status.memRemaining)} Remaining)
</td>
<td>{Utils.memoryBytesToString(status.diskUsed(prefix))}</td>
</tr>