aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2013-04-29 23:33:56 -0700
committerReynold Xin <rxin@cs.berkeley.edu>2013-04-29 23:33:56 -0700
commit1055785a836ab2361239f0937a1a22fee953e029 (patch)
treeb56c1b4abdb8755622bcc7f00ade5f7955f13e5a /core/src/main
parent7007201201981c6fb002e3008d97a6d6248f4dba (diff)
downloadspark-1055785a836ab2361239f0937a1a22fee953e029.tar.gz
spark-1055785a836ab2361239f0937a1a22fee953e029.tar.bz2
spark-1055785a836ab2361239f0937a1a22fee953e029.zip
Allow specifying the shuffle write file buffer size. The default buffer
size is 8KB in FastBufferedOutputStream, which is too small and would cause a lot of disk seeks.
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala5
-rw-r--r--core/src/main/scala/spark/storage/DiskStore.scala10
-rw-r--r--core/src/main/scala/spark/storage/ShuffleBlockManager.scala3
3 files changed, 10 insertions, 8 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index b94d729923..6e0ca9204d 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -513,8 +513,9 @@ class BlockManager(
* This is currently used for writing shuffle files out. Callers should handle error
* cases.
*/
- def getDiskBlockWriter(blockId: String, serializer: Serializer): BlockObjectWriter = {
- val writer = diskStore.getBlockWriter(blockId, serializer)
+ def getDiskBlockWriter(blockId: String, serializer: Serializer, bufferSize: Int)
+ : BlockObjectWriter = {
+ val writer = diskStore.getBlockWriter(blockId, serializer, bufferSize)
writer.registerCloseEventHandler(() => {
val myInfo = new BlockInfo(StorageLevel.DISK_ONLY, false)
blockInfo.put(blockId, myInfo)
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index f23cd5475f..4cddcc86fc 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -21,7 +21,7 @@ import spark.serializer.{Serializer, SerializationStream}
private class DiskStore(blockManager: BlockManager, rootDirs: String)
extends BlockStore(blockManager) {
- class DiskBlockObjectWriter(blockId: String, serializer: Serializer)
+ class DiskBlockObjectWriter(blockId: String, serializer: Serializer, bufferSize: Int)
extends BlockObjectWriter(blockId) {
private val f: File = createFile(blockId /*, allowAppendExisting */)
@@ -32,7 +32,6 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
private var validLength = 0L
override def open(): DiskBlockObjectWriter = {
- println("------------------------------------------------- opening " + f)
repositionableStream = new FastBufferedOutputStream(new FileOutputStream(f))
bs = blockManager.wrapForCompression(blockId, repositionableStream)
objOut = serializer.newInstance().serializeStream(bs)
@@ -55,7 +54,8 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
// Return the number of bytes written for this commit.
override def commit(): Long = {
bs.flush()
- repositionableStream.position()
+ validLength = repositionableStream.position()
+ validLength
}
override def revertPartialWrites() {
@@ -86,8 +86,8 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
addShutdownHook()
- def getBlockWriter(blockId: String, serializer: Serializer): BlockObjectWriter = {
- new DiskBlockObjectWriter(blockId, serializer)
+ def getBlockWriter(blockId: String, serializer: Serializer, bufferSize: Int): BlockObjectWriter = {
+ new DiskBlockObjectWriter(blockId, serializer, bufferSize)
}
override def getSize(blockId: String): Long = {
diff --git a/core/src/main/scala/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/spark/storage/ShuffleBlockManager.scala
index 2b1138e7a0..2b22dad459 100644
--- a/core/src/main/scala/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/spark/storage/ShuffleBlockManager.scala
@@ -25,9 +25,10 @@ class ShuffleBlockManager(blockManager: BlockManager) {
// Get a group of writers for a map task.
def acquireWriters(mapId: Int): ShuffleWriterGroup = {
+ val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024
val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
val blockId = ShuffleBlockManager.blockId(shuffleId, bucketId, mapId)
- blockManager.getDiskBlockWriter(blockId, serializer).open()
+ blockManager.getDiskBlockWriter(blockId, serializer, bufferSize).open()
}
new ShuffleWriterGroup(mapId, writers)
}