aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-08-15 12:27:04 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-10-07 15:15:42 -0700
commita224c8c9b80d1300a0826d6d95521543441580ea (patch)
tree4dc557c8461e74c3c44a3bb8f9adaa589f7fe1fe
parent3478ca676289f5eabf5dcaa6f80c6bc203cd3f41 (diff)
downloadspark-a224c8c9b80d1300a0826d6d95521543441580ea.tar.gz
spark-a224c8c9b80d1300a0826d6d95521543441580ea.tar.bz2
spark-a224c8c9b80d1300a0826d6d95521543441580ea.zip
Adding option to force sync to the filesystem
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskStore.scala20
1 files changed, 17 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index d053958e23..2a246d7808 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -63,17 +63,20 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
}
private val f: File = createFile(blockId /*, allowAppendExisting */)
+ private val syncWrites = System.getProperty("spark.shuffle.sync", "false").toBoolean
// The file channel, used for repositioning / truncating the file.
private var channel: FileChannel = null
private var bs: OutputStream = null
+ private var fos: FileOutputStream = null
private var ts: TimeTrackingOutputStream = null
private var objOut: SerializationStream = null
private var lastValidPosition = 0L
private var initialized = false
+ private var syncTime = 0L
override def open(): DiskBlockObjectWriter = {
- val fos = new FileOutputStream(f, true)
+ fos = new FileOutputStream(f, true)
ts = new TimeTrackingOutputStream(fos)
channel = fos.getChannel()
bs = blockManager.wrapForCompression(blockId, new FastBufferedOutputStream(ts, bufferSize))
@@ -84,9 +87,19 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
override def close() {
if (initialized) {
- objOut.close()
+ if (syncWrites) {
+ val start = System.nanoTime()
+ objOut.flush()
+ fos.getFD.sync()
+ objOut.close()
+ syncTime += System.nanoTime() - start
+ } else {
+ objOut.close()
+ }
+
channel = null
bs = null
+ fos = null
ts = null
objOut = null
}
@@ -132,7 +145,8 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
override def size(): Long = lastValidPosition
override def timeWriting: Long = {
- Option(ts).map(t => t.timeWriting).getOrElse(0L) // ts could be null if never written to
+ // ts could be null if never written to
+ Option(ts).map(t => t.timeWriting).getOrElse(0L) + syncTime
}
}