diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-08-15 12:27:04 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-10-07 15:15:42 -0700 |
commit | a224c8c9b80d1300a0826d6d95521543441580ea (patch) | |
tree | 4dc557c8461e74c3c44a3bb8f9adaa589f7fe1fe | |
parent | 3478ca676289f5eabf5dcaa6f80c6bc203cd3f41 (diff) | |
download | spark-a224c8c9b80d1300a0826d6d95521543441580ea.tar.gz spark-a224c8c9b80d1300a0826d6d95521543441580ea.tar.bz2 spark-a224c8c9b80d1300a0826d6d95521543441580ea.zip |
Adding option to force sync to the filesystem
-rw-r--r-- | core/src/main/scala/org/apache/spark/storage/DiskStore.scala | 20 |
1 files changed, 17 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index d053958e23..2a246d7808 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -63,17 +63,20 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) } private val f: File = createFile(blockId /*, allowAppendExisting */) + private val syncWrites = System.getProperty("spark.shuffle.sync", "false").toBoolean // The file channel, used for repositioning / truncating the file. private var channel: FileChannel = null private var bs: OutputStream = null + private var fos: FileOutputStream = null private var ts: TimeTrackingOutputStream = null private var objOut: SerializationStream = null private var lastValidPosition = 0L private var initialized = false + private var syncTime = 0L override def open(): DiskBlockObjectWriter = { - val fos = new FileOutputStream(f, true) + fos = new FileOutputStream(f, true) ts = new TimeTrackingOutputStream(fos) channel = fos.getChannel() bs = blockManager.wrapForCompression(blockId, new FastBufferedOutputStream(ts, bufferSize)) @@ -84,9 +87,19 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) override def close() { if (initialized) { - objOut.close() + if (syncWrites) { + val start = System.nanoTime() + objOut.flush() + fos.getFD.sync() + objOut.close() + syncTime += System.nanoTime() - start + } else { + objOut.close() + } + channel = null bs = null + fos = null ts = null objOut = null } @@ -132,7 +145,8 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) override def size(): Long = lastValidPosition override def timeWriting: Long = { - Option(ts).map(t => t.timeWriting).getOrElse(0L) // ts could be null if never written to + // ts could be null if never written to + Option(ts).map(t => t.timeWriting).getOrElse(0L) + syncTime } } |