aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-10-20 22:20:32 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-10-20 22:20:32 -0700
commit35886f347466b25625d5391c97c2deb8293ebc66 (patch)
tree2a77302e3c1caa6615089507278c6e10eaeaf5b1 /core
parent5b9380e0173b3d3d13235ae912e9ccc2a974b98b (diff)
parent9e9e9e1b42df26244d29b8920a41177e296a85c4 (diff)
downloadspark-35886f347466b25625d5391c97c2deb8293ebc66.tar.gz
spark-35886f347466b25625d5391c97c2deb8293ebc66.tar.bz2
spark-35886f347466b25625d5391c97c2deb8293ebc66.zip
Merge pull request #41 from pwendell/shuffle-benchmark
Provide Instrumentation for Shuffle Write Performance Shuffle write performance can have a major impact on the performance of jobs. This patch adds a few pieces of instrumentation related to shuffle writes. They are: 1. A listing of the time spent performing blocking writes for each task. This is implemented by keeping track of the aggregate delay seen by many individual writes. 2. An undocumented option `spark.shuffle.sync` which forces shuffle data to sync to disk. This is necessary for measuring shuffle performance in the absence of the OS buffer cache. 3. An internal utility which micro-benchmarks write throughput for simulated shuffle outputs. I'm going to do some performance testing on this to see whether these small timing calls add overhead. From a feature perspective, however, I consider this complete. Any feedback is appreciated.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskStore.scala44
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala4
-rw-r--r--core/src/main/scala/spark/storage/StoragePerfTester.scala84
6 files changed, 141 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index f311141148..0b4892f98f 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -102,4 +102,9 @@ class ShuffleWriteMetrics extends Serializable {
* Number of bytes written for a shuffle
*/
var shuffleBytesWritten: Long = _
+
+ /**
+ * Time spent blocking on writes to disk or buffer cache, in nanoseconds.
+ */
+ var shuffleWriteTime: Long = _
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 802791797a..40baea69e8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -164,17 +164,20 @@ private[spark] class ShuffleMapTask(
// Commit the writes. Get the size of each bucket block (total block size).
var totalBytes = 0L
+ var totalTime = 0L
val compressedSizes: Array[Byte] = buckets.writers.map { writer: BlockObjectWriter =>
writer.commit()
writer.close()
val size = writer.size()
totalBytes += size
+ totalTime += writer.timeWriting()
MapOutputTracker.compressSize(size)
}
// Update shuffle metrics.
val shuffleMetrics = new ShuffleWriteMetrics
shuffleMetrics.shuffleBytesWritten = totalBytes
+ shuffleMetrics.shuffleWriteTime = totalTime
metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)
new MapStatus(blockManager.blockManagerId, compressedSizes)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index 2a67800c45..76c92cefd8 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -62,4 +62,9 @@ abstract class BlockObjectWriter(val blockId: BlockId) {
* Size of the valid writes, in bytes.
*/
def size(): Long
+
+ /**
+ * Cumulative time spent performing blocking writes, in ns.
+ */
+ def timeWriting(): Long
}
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index b7ca61e938..2a9a3f61bd 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -45,19 +45,40 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
class DiskBlockObjectWriter(blockId: BlockId, serializer: Serializer, bufferSize: Int)
extends BlockObjectWriter(blockId) {
+ /** Intercepts write calls and tracks total time spent writing. Not thread safe. */
+ private class TimeTrackingOutputStream(out: OutputStream) extends OutputStream {
+ def timeWriting = _timeWriting
+ private var _timeWriting = 0L
+
+ private def callWithTiming(f: => Unit) = {
+ val start = System.nanoTime()
+ f
+ _timeWriting += (System.nanoTime() - start)
+ }
+
+ def write(i: Int): Unit = callWithTiming(out.write(i))
+ override def write(b: Array[Byte]) = callWithTiming(out.write(b))
+ override def write(b: Array[Byte], off: Int, len: Int) = callWithTiming(out.write(b, off, len))
+ }
+
private val f: File = createFile(blockId /*, allowAppendExisting */)
+ private val syncWrites = System.getProperty("spark.shuffle.sync", "false").toBoolean
// The file channel, used for repositioning / truncating the file.
private var channel: FileChannel = null
private var bs: OutputStream = null
+ private var fos: FileOutputStream = null
+ private var ts: TimeTrackingOutputStream = null
private var objOut: SerializationStream = null
private var lastValidPosition = 0L
private var initialized = false
+ private var _timeWriting = 0L
override def open(): DiskBlockObjectWriter = {
- val fos = new FileOutputStream(f, true)
+ fos = new FileOutputStream(f, true)
+ ts = new TimeTrackingOutputStream(fos)
channel = fos.getChannel()
- bs = blockManager.wrapForCompression(blockId, new FastBufferedOutputStream(fos, bufferSize))
+ bs = blockManager.wrapForCompression(blockId, new FastBufferedOutputStream(ts, bufferSize))
objOut = serializer.newInstance().serializeStream(bs)
initialized = true
this
@@ -65,9 +86,23 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
override def close() {
if (initialized) {
- objOut.close()
+ if (syncWrites) {
+ // Force outstanding writes to disk and track how long it takes
+ objOut.flush()
+ val start = System.nanoTime()
+ fos.getFD.sync()
+ _timeWriting += System.nanoTime() - start
+ objOut.close()
+ } else {
+ objOut.close()
+ }
+
+ _timeWriting += ts.timeWriting
+
channel = null
bs = null
+ fos = null
+ ts = null
objOut = null
}
// Invoke the close callback handler.
@@ -110,6 +145,9 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
}
override def size(): Long = lastValidPosition
+
+ // Only valid if called after close()
+ override def timeWriting = _timeWriting
}
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 163a3746ea..b7c81d091c 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -86,7 +86,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
Seq("Task ID", "Status", "Locality Level", "Executor", "Launch Time", "Duration") ++
Seq("GC Time") ++
{if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
- {if (hasShuffleWrite) Seq("Shuffle Write") else Nil} ++
+ {if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++
Seq("Errors")
val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite), tasks)
@@ -169,6 +169,8 @@ private[spark] class StagePage(parent: JobProgressUI) {
Utils.bytesToString(s.remoteBytesRead)}.getOrElse("")}</td>
}}
{if (shuffleWrite) {
+ <td>{metrics.flatMap{m => m.shuffleWriteMetrics}.map{s =>
+ parent.formatDuration(s.shuffleWriteTime / (1000 * 1000))}.getOrElse("")}</td>
<td>{metrics.flatMap{m => m.shuffleWriteMetrics}.map{s =>
Utils.bytesToString(s.shuffleBytesWritten)}.getOrElse("")}</td>
}}
diff --git a/core/src/main/scala/spark/storage/StoragePerfTester.scala b/core/src/main/scala/spark/storage/StoragePerfTester.scala
new file mode 100644
index 0000000000..5f30383fd0
--- /dev/null
+++ b/core/src/main/scala/spark/storage/StoragePerfTester.scala
@@ -0,0 +1,84 @@
+package org.apache.spark.storage
+
+import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.{CountDownLatch, Executors}
+
+import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.SparkContext
+import org.apache.spark.util.Utils
+
+/** Utility for micro-benchmarking shuffle write performance.
+ *
+ * Writes simulated shuffle output from several threads and records the observed throughput*/
+object StoragePerfTester {
+ def main(args: Array[String]) = {
+ /** Total amount of data to generate. Distributed evenly amongst maps and reduce splits. */
+ val dataSizeMb = Utils.memoryStringToMb(sys.env.getOrElse("OUTPUT_DATA", "1g"))
+
+ /** Number of map tasks. All tasks execute concurrently. */
+ val numMaps = sys.env.get("NUM_MAPS").map(_.toInt).getOrElse(8)
+
+ /** Number of reduce splits for each map task. */
+ val numOutputSplits = sys.env.get("NUM_REDUCERS").map(_.toInt).getOrElse(500)
+
+ val recordLength = 1000 // ~1KB records
+ val totalRecords = dataSizeMb * 1000
+ val recordsPerMap = totalRecords / numMaps
+
+ val writeData = "1" * recordLength
+ val executor = Executors.newFixedThreadPool(numMaps)
+
+ System.setProperty("spark.shuffle.compress", "false")
+ System.setProperty("spark.shuffle.sync", "true")
+
+ // This is only used to instantiate a BlockManager. All thread scheduling is done manually.
+ val sc = new SparkContext("local[4]", "Write Tester")
+ val blockManager = sc.env.blockManager
+
+ def writeOutputBytes(mapId: Int, total: AtomicLong) = {
+ val shuffle = blockManager.shuffleBlockManager.forShuffle(1, numOutputSplits,
+ new KryoSerializer())
+ val buckets = shuffle.acquireWriters(mapId)
+ for (i <- 1 to recordsPerMap) {
+ buckets.writers(i % numOutputSplits).write(writeData)
+ }
+ buckets.writers.map {w =>
+ w.commit()
+ total.addAndGet(w.size())
+ w.close()
+ }
+
+ shuffle.releaseWriters(buckets)
+ }
+
+ val start = System.currentTimeMillis()
+ val latch = new CountDownLatch(numMaps)
+ val totalBytes = new AtomicLong()
+ for (task <- 1 to numMaps) {
+ executor.submit(new Runnable() {
+ override def run() = {
+ try {
+ writeOutputBytes(task, totalBytes)
+ latch.countDown()
+ } catch {
+ case e: Exception =>
+ println("Exception in child thread: " + e + " " + e.getMessage)
+ System.exit(1)
+ }
+ }
+ })
+ }
+ latch.await()
+ val end = System.currentTimeMillis()
+ val time = (end - start) / 1000.0
+ val bytesPerSecond = totalBytes.get() / time
+ val bytesPerFile = (totalBytes.get() / (numOutputSplits * numMaps.toDouble)).toLong
+
+ System.err.println("files_total\t\t%s".format(numMaps * numOutputSplits))
+ System.err.println("bytes_per_file\t\t%s".format(Utils.bytesToString(bytesPerFile)))
+ System.err.println("agg_throughput\t\t%s/s".format(Utils.bytesToString(bytesPerSecond.toLong)))
+
+ executor.shutdown()
+ sc.stop()
+ }
+}