aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2015-03-24 16:29:40 -0700
committerAndrew Or <andrew@databricks.com>2015-03-24 16:29:40 -0700
commitd8ccf655f344eed65cdaf5d9252f1b565b8406ca (patch)
treea20c4fa0000ec54df2cc6f508bdfd931e629770c /core
parent6948ab6f8ba836446b005f2cf1cc4abc944c5053 (diff)
downloadspark-d8ccf655f344eed65cdaf5d9252f1b565b8406ca.tar.gz
spark-d8ccf655f344eed65cdaf5d9252f1b565b8406ca.tar.bz2
spark-d8ccf655f344eed65cdaf5d9252f1b565b8406ca.zip
[SPARK-3570] Include time to open files in shuffle write time.
Opening shuffle files can be very significant when the disk is contended, especially when using ext3. While writing data to a file can avoid hitting disk (and instead hit the buffer cache), opening a file always involves writing some metadata about the file to disk, so the open time can be a very significant portion of the shuffle write time. In one job I ran recently, the time to write shuffle data to the file was only 4ms for each task, but the time to open the file was about 100x as long (~400ms). When we add metrics about spilled data (#2504), we should ensure that the file open time is also included there. Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #4550 from kayousterhout/SPARK-3570 and squashes the following commits: ea3a4ae [Kay Ousterhout] Added comment about excluded open time fdc5185 [Kay Ousterhout] Improved comment 42b7e43 [Kay Ousterhout] Fixed parens for nanotime 2423555 [Kay Ousterhout] [SPARK-3570] Include time to open files in shuffle write time.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala5
3 files changed, 12 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
index 660df00bc3..d0178dfde6 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
@@ -112,6 +112,7 @@ class FileShuffleBlockManager(conf: SparkConf)
private val shuffleState = shuffleStates(shuffleId)
private var fileGroup: ShuffleFileGroup = null
+ val openStartTime = System.nanoTime
val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
fileGroup = getUnusedFileGroup()
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
@@ -135,6 +136,9 @@ class FileShuffleBlockManager(conf: SparkConf)
blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize, writeMetrics)
}
}
+ // Creating the file to write to and creating a disk writer both involve interacting with
+ // the disk, so should be included in the shuffle write time.
+ writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime)
override def releaseWriters(success: Boolean) {
if (consolidateShuffleFiles) {
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index fa2e617762..55ea0f17b1 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -63,6 +63,9 @@ private[spark] class SortShuffleWriter[K, V, C](
sorter.insertAll(records)
}
+ // Don't bother including the time to open the merged output file in the shuffle write time,
+ // because it just opens a single file, so is typically too fast to measure accurately
+ // (see SPARK-3570).
val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId)
val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId)
val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 3262e670c2..b962c101c9 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -352,6 +352,7 @@ private[spark] class ExternalSorter[K, V, C](
// Create our file writers if we haven't done so yet
if (partitionWriters == null) {
curWriteMetrics = new ShuffleWriteMetrics()
+ val openStartTime = System.nanoTime
partitionWriters = Array.fill(numPartitions) {
// Because these files may be read during shuffle, their compression must be controlled by
// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
@@ -359,6 +360,10 @@ private[spark] class ExternalSorter[K, V, C](
val (blockId, file) = diskBlockManager.createTempShuffleBlock()
blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics).open()
}
+ // Creating the file to write to and creating a disk writer both involve interacting with
+ // the disk, and can take a long time in aggregate when we open many files, so should be
+ // included in the shuffle write time.
+ curWriteMetrics.incShuffleWriteTime(System.nanoTime - openStartTime)
}
// No need to sort stuff, just write each element out