aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorSital Kedia <skedia@fb.com>2016-05-27 11:22:39 -0700
committerAndrew Or <andrew@databricks.com>2016-05-27 11:22:39 -0700
commitce756daa4f012ebdc5a41bf5a89ff11b6dfdab8c (patch)
tree964120ed4895c8eaf70b73692664d8fdefc7542a /core/src
parent5bdbedf2201efa6c34392aa9eff709761f027e1d (diff)
downloadspark-ce756daa4f012ebdc5a41bf5a89ff11b6dfdab8c.tar.gz
spark-ce756daa4f012ebdc5a41bf5a89ff11b6dfdab8c.tar.bz2
spark-ce756daa4f012ebdc5a41bf5a89ff11b6dfdab8c.zip
[SPARK-15569] Reduce frequency of updateBytesWritten function in Disk…
## What changes were proposed in this pull request? Profiling a Spark job spilling large amount of intermediate data we found that significant portion of time is being spent in DiskObjectWriter.updateBytesWritten function. Looking at the code, we see that the function is being called too frequently to update the number of bytes written to disk. We should reduce the frequency to avoid this. ## How was this patch tested? Tested by running the job on cluster and saw 20% CPU gain by this change. Author: Sital Kedia <skedia@fb.com> Closes #13332 from sitalkedia/DiskObjectWriter.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala12
2 files changed, 7 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
index ab97d2e4b8..5b493f470b 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
@@ -203,8 +203,7 @@ private[spark] class DiskBlockObjectWriter(
numRecordsWritten += 1
writeMetrics.incRecordsWritten(1)
- // TODO: call updateBytesWritten() less frequently.
- if (numRecordsWritten % 32 == 0) {
+ if (numRecordsWritten % 16384 == 0) {
updateBytesWritten()
}
}
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
index 8eff3c2970..ec4ef4b2fc 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
@@ -53,13 +53,13 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
assert(writeMetrics.recordsWritten === 1)
// Metrics don't update on every write
assert(writeMetrics.bytesWritten == 0)
- // After 32 writes, metrics should update
- for (i <- 0 until 32) {
+ // After 16384 writes, metrics should update
+ for (i <- 0 until 16384) {
writer.flush()
writer.write(Long.box(i), Long.box(i))
}
assert(writeMetrics.bytesWritten > 0)
- assert(writeMetrics.recordsWritten === 33)
+ assert(writeMetrics.recordsWritten === 16385)
writer.commitAndClose()
assert(file.length() == writeMetrics.bytesWritten)
}
@@ -75,13 +75,13 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
assert(writeMetrics.recordsWritten === 1)
// Metrics don't update on every write
assert(writeMetrics.bytesWritten == 0)
- // After 32 writes, metrics should update
- for (i <- 0 until 32) {
+ // After 16384 writes, metrics should update
+ for (i <- 0 until 16384) {
writer.flush()
writer.write(Long.box(i), Long.box(i))
}
assert(writeMetrics.bytesWritten > 0)
- assert(writeMetrics.recordsWritten === 33)
+ assert(writeMetrics.recordsWritten === 16385)
writer.revertPartialWritesAndClose()
assert(writeMetrics.bytesWritten == 0)
assert(writeMetrics.recordsWritten == 0)