aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-10-07 15:48:17 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-10-07 16:30:25 -0700
commitb08306c5cf015f6f2ca3b808cda127d438b80fc8 (patch)
treec3b4c4e27a94fcde1b24577e98db50465b57fbd1 /core
parent524d01ea31234f9f91e3ef7ef696b7ce76e1997c (diff)
downloadspark-b08306c5cf015f6f2ca3b808cda127d438b80fc8.tar.gz
spark-b08306c5cf015f6f2ca3b808cda127d438b80fc8.tar.bz2
spark-b08306c5cf015f6f2ca3b808cda127d438b80fc8.zip
Minor cleanup
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskStore.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala24
-rw-r--r--core/src/main/scala/spark/storage/StoragePerfTester.scala85
3 files changed, 45 insertions, 66 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index c10bf6069c..e6d6190bc3 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -48,7 +48,6 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
/** Intercepts write calls and tracks total time spent writing. Not thread safe. */
private class TimeTrackingOutputStream(out: OutputStream) extends OutputStream {
def timeWriting = _timeWriting
-
private var _timeWriting = 0L
private def callWithTiming(f: => Unit) = {
@@ -88,6 +87,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
override def close() {
if (initialized) {
if (syncWrites) {
+ // Force outstanding writes to disk and track how long it takes
val start = System.nanoTime()
objOut.flush()
fos.getFD.sync()
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 701bc64a8b..b7c81d091c 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -86,7 +86,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
Seq("Task ID", "Status", "Locality Level", "Executor", "Launch Time", "Duration") ++
Seq("GC Time") ++
{if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
- {if (hasShuffleWrite) Seq("Shuffle Write") else Nil} ++
+ {if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++
Seq("Errors")
val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite), tasks)
@@ -152,22 +152,6 @@ private[spark] class StagePage(parent: JobProgressUI) {
else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("")
val gcTime = metrics.map(m => m.jvmGCTime).getOrElse(0L)
-
- val remoteBytesRead: Option[Long] = metrics.flatMap{m => m.shuffleReadMetrics}.map(r => r.remoteBytesRead)
- val shuffleBytesWritten: Option[Long] = metrics.flatMap{m => m.shuffleWriteMetrics}.map(r => r.shuffleBytesWritten)
-
- val writeThroughput: Option[Long] = metrics.flatMap{m => m.shuffleWriteMetrics}.flatMap{ s=>
- val bytesWritten = s.shuffleBytesWritten
- val timeTaken = s.shuffleWriteTime
- val timeSeconds = timeTaken / (1000 * 1000 * 1000.0)
- if (bytesWritten < 10000 || timeSeconds < .01) { // To little data to form an useful average
- None
- } else {
- Some((bytesWritten / timeSeconds).toLong)
- }
- }
- val writeThroughputStr = writeThroughput.map(t => " (%s/s)".format(Utils.bytesToString(t)))
-
<tr>
<td>{info.taskId}</td>
<td>{info.status}</td>
@@ -185,10 +169,10 @@ private[spark] class StagePage(parent: JobProgressUI) {
Utils.bytesToString(s.remoteBytesRead)}.getOrElse("")}</td>
}}
{if (shuffleWrite) {
+ <td>{metrics.flatMap{m => m.shuffleWriteMetrics}.map{s =>
+ parent.formatDuration(s.shuffleWriteTime / (1000 * 1000))}.getOrElse("")}</td>
<td>{metrics.flatMap{m => m.shuffleWriteMetrics}.map{s =>
- Utils.bytesToString(s.shuffleBytesWritten)}.getOrElse("")}
- {writeThroughputStr.getOrElse("")}
- </td>
+ Utils.bytesToString(s.shuffleBytesWritten)}.getOrElse("")}</td>
}}
<td>{exception.map(e =>
<span>
diff --git a/core/src/main/scala/spark/storage/StoragePerfTester.scala b/core/src/main/scala/spark/storage/StoragePerfTester.scala
index 6ecd936bbc..5f30383fd0 100644
--- a/core/src/main/scala/spark/storage/StoragePerfTester.scala
+++ b/core/src/main/scala/spark/storage/StoragePerfTester.scala
@@ -1,28 +1,25 @@
-package spark.storage
+package org.apache.spark.storage
-import java.util.concurrent.{CountDownLatch, Executors}
import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.{CountDownLatch, Executors}
-import spark.{KryoSerializer, SparkContext, Utils}
+import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.SparkContext
+import org.apache.spark.util.Utils
-/** Utility for micro-benchmarking storage performance. */
+/** Utility for micro-benchmarking shuffle write performance.
+ *
+ * Writes simulated shuffle output from several threads and records the observed throughput*/
object StoragePerfTester {
- /** Writing shuffle data from several concurrent tasks and measure throughput. */
def main(args: Array[String]) = {
- def intArg(key: String, default: Int) = Option(System.getenv(key)).map(_.toInt).getOrElse(default)
- def stringArg(key: String, default: String) = Option(System.getenv(key)).getOrElse(default)
-
- /** Total number of simulated shuffles to run. */
- val numShuffles = intArg("NUM_SHUFFLES", 1)
-
- /** Total amount of data to generate, will be distributed evenly amongst maps and reduce splits. */
- val dataSizeMb = Utils.memoryStringToMb(stringArg("OUTPUT_DATA", "1g"))
+ /** Total amount of data to generate. Distributed evenly amongst maps and reduce splits. */
+ val dataSizeMb = Utils.memoryStringToMb(sys.env.getOrElse("OUTPUT_DATA", "1g"))
/** Number of map tasks. All tasks execute concurrently. */
- val numMaps = intArg("NUM_MAPS", 8)
+ val numMaps = sys.env.get("NUM_MAPS").map(_.toInt).getOrElse(8)
/** Number of reduce splits for each map task. */
- val numOutputSplits = intArg("NUM_REDUCERS", 500)
+ val numOutputSplits = sys.env.get("NUM_REDUCERS").map(_.toInt).getOrElse(500)
val recordLength = 1000 // ~1KB records
val totalRecords = dataSizeMb * 1000
@@ -34,11 +31,13 @@ object StoragePerfTester {
System.setProperty("spark.shuffle.compress", "false")
System.setProperty("spark.shuffle.sync", "true")
+ // This is only used to instantiate a BlockManager. All thread scheduling is done manually.
val sc = new SparkContext("local[4]", "Write Tester")
val blockManager = sc.env.blockManager
- def writeOutputBytes(mapId: Int, shuffleId: Int, total: AtomicLong) = {
- val shuffle = blockManager.shuffleBlockManager.forShuffle(shuffleId, numOutputSplits, new KryoSerializer())
+ def writeOutputBytes(mapId: Int, total: AtomicLong) = {
+ val shuffle = blockManager.shuffleBlockManager.forShuffle(1, numOutputSplits,
+ new KryoSerializer())
val buckets = shuffle.acquireWriters(mapId)
for (i <- 1 to recordsPerMap) {
buckets.writers(i % numOutputSplits).write(writeData)
@@ -52,36 +51,32 @@ object StoragePerfTester {
shuffle.releaseWriters(buckets)
}
- for (shuffle <- 1 to numShuffles) {
- val start = System.currentTimeMillis()
- val latch = new CountDownLatch(numMaps)
- val totalBytes = new AtomicLong()
- for (task <- 1 to numMaps) {
- executor.submit(new Runnable() {
- override def run() = {
- try {
- writeOutputBytes(task, shuffle, totalBytes)
- latch.countDown()
- } catch {
- case e: Exception =>
- println("Exception in child thread: " + e + " " + e.getMessage)
- System.exit(1)
- }
+ val start = System.currentTimeMillis()
+ val latch = new CountDownLatch(numMaps)
+ val totalBytes = new AtomicLong()
+ for (task <- 1 to numMaps) {
+ executor.submit(new Runnable() {
+ override def run() = {
+ try {
+ writeOutputBytes(task, totalBytes)
+ latch.countDown()
+ } catch {
+ case e: Exception =>
+ println("Exception in child thread: " + e + " " + e.getMessage)
+ System.exit(1)
}
- })
- }
- latch.await()
- val end = System.currentTimeMillis()
- val time = (end - start) / 1000.0
- val bytesPerSecond = totalBytes.get() / time
- val bytesPerFile = (totalBytes.get() / (numOutputSplits * numMaps.toDouble)).toLong
-
- System.err.println("files_total\t\t%s".format(numMaps * numOutputSplits))
- System.err.println("bytes_per_file\t\t%s".format(Utils.memoryBytesToString(bytesPerFile)))
- System.err.println("agg_throughput\t\t%s/s".format(Utils.memoryBytesToString(bytesPerSecond.toLong)))
- System.err.println("Shuffle %s is finished in %ss. To run next shuffle, press Enter:".format(shuffle, time))
- readLine()
+ }
+ })
}
+ latch.await()
+ val end = System.currentTimeMillis()
+ val time = (end - start) / 1000.0
+ val bytesPerSecond = totalBytes.get() / time
+ val bytesPerFile = (totalBytes.get() / (numOutputSplits * numMaps.toDouble)).toLong
+
+ System.err.println("files_total\t\t%s".format(numMaps * numOutputSplits))
+ System.err.println("bytes_per_file\t\t%s".format(Utils.bytesToString(bytesPerFile)))
+ System.err.println("agg_throughput\t\t%s/s".format(Utils.bytesToString(bytesPerSecond.toLong)))
executor.shutdown()
sc.stop()