aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-08-14 18:17:07 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-10-07 15:15:41 -0700
commit3478ca676289f5eabf5dcaa6f80c6bc203cd3f41 (patch)
treeb57f03b2527efec1056be5d9f51b8c372e660b4c /core
parent3745a1827fc955be6c3236e4c31d27db062f15de (diff)
downloadspark-3478ca676289f5eabf5dcaa6f80c6bc203cd3f41.tar.gz
spark-3478ca676289f5eabf5dcaa6f80c6bc203cd3f41.tar.bz2
spark-3478ca676289f5eabf5dcaa6f80c6bc203cd3f41.zip
Track and report write throughput for shuffle tasks.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskStore.scala26
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala20
5 files changed, 57 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index f311141148..0b4892f98f 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -102,4 +102,9 @@ class ShuffleWriteMetrics extends Serializable {
* Number of bytes written for a shuffle
*/
var shuffleBytesWritten: Long = _
+
+ /**
+ * Time spent blocking on writes to disk or buffer cache, in nanoseconds.
+ */
+ var shuffleWriteTime: Long = _
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index d23df0dd2b..eb27437231 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -154,8 +154,10 @@ private[spark] class ShuffleMapTask(
// Commit the writes. Get the size of each bucket block (total block size).
var totalBytes = 0L
+ var totalTime = 0L
val compressedSizes: Array[Byte] = buckets.writers.map { writer: BlockObjectWriter =>
writer.commit()
+ totalTime += writer.timeWriting()
writer.close()
val size = writer.size()
totalBytes += size
@@ -165,6 +167,7 @@ private[spark] class ShuffleMapTask(
// Update shuffle metrics.
val shuffleMetrics = new ShuffleWriteMetrics
shuffleMetrics.shuffleBytesWritten = totalBytes
+ shuffleMetrics.shuffleWriteTime = totalTime
metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)
return new MapStatus(blockManager.blockManagerId, compressedSizes)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index 39f103297f..de3e3b0864 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -62,4 +62,9 @@ abstract class BlockObjectWriter(val blockId: String) {
* Size of the valid writes, in bytes.
*/
def size(): Long
+
+ /**
+ * Cumulative time spent performing blocking writes, in ns.
+ */
+ def timeWriting(): Long
}
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index 63447baf8c..d053958e23 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -45,19 +45,38 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
class DiskBlockObjectWriter(blockId: String, serializer: Serializer, bufferSize: Int)
extends BlockObjectWriter(blockId) {
+ /** Intercepts write calls and tracks total time spent writing. Not thread safe. */
+ private class TimeTrackingOutputStream(out: OutputStream) extends OutputStream {
+ def timeWriting = _timeWriting
+
+ private var _timeWriting = 0L
+
+ private def callWithTiming(f: => Unit) = {
+ val start = System.nanoTime()
+ f
+ _timeWriting += (System.nanoTime() - start)
+ }
+
+ def write(i: Int): Unit = callWithTiming(out.write(i))
+ override def write(b: Array[Byte]) = callWithTiming(out.write(b))
+ override def write(b: Array[Byte], off: Int, len: Int) = callWithTiming(out.write(b, off, len))
+ }
+
private val f: File = createFile(blockId /*, allowAppendExisting */)
// The file channel, used for repositioning / truncating the file.
private var channel: FileChannel = null
private var bs: OutputStream = null
+ private var ts: TimeTrackingOutputStream = null
private var objOut: SerializationStream = null
private var lastValidPosition = 0L
private var initialized = false
override def open(): DiskBlockObjectWriter = {
val fos = new FileOutputStream(f, true)
+ ts = new TimeTrackingOutputStream(fos)
channel = fos.getChannel()
- bs = blockManager.wrapForCompression(blockId, new FastBufferedOutputStream(fos, bufferSize))
+ bs = blockManager.wrapForCompression(blockId, new FastBufferedOutputStream(ts, bufferSize))
objOut = serializer.newInstance().serializeStream(bs)
initialized = true
this
@@ -68,6 +87,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
objOut.close()
channel = null
bs = null
+ ts = null
objOut = null
}
// Invoke the close callback handler.
@@ -110,6 +130,10 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
}
override def size(): Long = lastValidPosition
+
+ override def timeWriting: Long = {
+ Option(ts).map(t => t.timeWriting).getOrElse(0L) // ts could be null if never written to
+ }
}
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 163a3746ea..701bc64a8b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -152,6 +152,22 @@ private[spark] class StagePage(parent: JobProgressUI) {
else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("")
val gcTime = metrics.map(m => m.jvmGCTime).getOrElse(0L)
+
+ val remoteBytesRead: Option[Long] = metrics.flatMap{m => m.shuffleReadMetrics}.map(r => r.remoteBytesRead)
+ val shuffleBytesWritten: Option[Long] = metrics.flatMap{m => m.shuffleWriteMetrics}.map(r => r.shuffleBytesWritten)
+
+ val writeThroughput: Option[Long] = metrics.flatMap{m => m.shuffleWriteMetrics}.flatMap{ s=>
+ val bytesWritten = s.shuffleBytesWritten
+ val timeTaken = s.shuffleWriteTime
+ val timeSeconds = timeTaken / (1000 * 1000 * 1000.0)
+ if (bytesWritten < 10000 || timeSeconds < .01) { // To little data to form an useful average
+ None
+ } else {
+ Some((bytesWritten / timeSeconds).toLong)
+ }
+ }
+ val writeThroughputStr = writeThroughput.map(t => " (%s/s)".format(Utils.bytesToString(t)))
+
<tr>
<td>{info.taskId}</td>
<td>{info.status}</td>
@@ -170,7 +186,9 @@ private[spark] class StagePage(parent: JobProgressUI) {
}}
{if (shuffleWrite) {
<td>{metrics.flatMap{m => m.shuffleWriteMetrics}.map{s =>
- Utils.bytesToString(s.shuffleBytesWritten)}.getOrElse("")}</td>
+ Utils.bytesToString(s.shuffleBytesWritten)}.getOrElse("")}
+ {writeThroughputStr.getOrElse("")}
+ </td>
}}
<td>{exception.map(e =>
<span>