diff options
-rw-r--r-- | core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala | 3 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala | 12 |
2 files changed, 7 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala index ab97d2e4b8..5b493f470b 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala @@ -203,8 +203,7 @@ private[spark] class DiskBlockObjectWriter( numRecordsWritten += 1 writeMetrics.incRecordsWritten(1) - // TODO: call updateBytesWritten() less frequently. - if (numRecordsWritten % 32 == 0) { + if (numRecordsWritten % 16384 == 0) { updateBytesWritten() } } diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala index 8eff3c2970..ec4ef4b2fc 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala @@ -53,13 +53,13 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { assert(writeMetrics.recordsWritten === 1) // Metrics don't update on every write assert(writeMetrics.bytesWritten == 0) - // After 32 writes, metrics should update - for (i <- 0 until 32) { + // After 16384 writes, metrics should update + for (i <- 0 until 16384) { writer.flush() writer.write(Long.box(i), Long.box(i)) } assert(writeMetrics.bytesWritten > 0) - assert(writeMetrics.recordsWritten === 33) + assert(writeMetrics.recordsWritten === 16385) writer.commitAndClose() assert(file.length() == writeMetrics.bytesWritten) } @@ -75,13 +75,13 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { assert(writeMetrics.recordsWritten === 1) // Metrics don't update on every write assert(writeMetrics.bytesWritten == 0) - // After 32 writes, metrics should update - for (i <- 0 until 32) { + // After 16384 writes, metrics should update + for (i <- 0 until 16384) { writer.flush() writer.write(Long.box(i), Long.box(i)) } assert(writeMetrics.bytesWritten > 0) - assert(writeMetrics.recordsWritten === 33) + assert(writeMetrics.recordsWritten === 16385) writer.revertPartialWritesAndClose() assert(writeMetrics.bytesWritten == 0) assert(writeMetrics.recordsWritten == 0) |