diff options
author | root <root@ip-10-29-232-35.ec2.internal> | 2012-09-23 05:56:44 +0000 |
---|---|---|
committer | root <root@ip-10-29-232-35.ec2.internal> | 2012-09-23 05:56:44 +0000 |
commit | e41cab04cafdb610f108a03a9e062d422f2eedd8 (patch) | |
tree | 0540632b33ada6e7fa347dd753b6abd797d69cbd /core | |
parent | 33fb373e69b5cd437257d5260fb90db7bcec8285 (diff) | |
download | spark-e41cab04cafdb610f108a03a9e062d422f2eedd8.tar.gz spark-e41cab04cafdb610f108a03a9e062d422f2eedd8.tar.bz2 spark-e41cab04cafdb610f108a03a9e062d422f2eedd8.zip |
Avoid creating an extra buffer when saving a stream of values as DISK_ONLY
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/storage/BlockStore.scala | 44 |
1 files changed, 24 insertions, 20 deletions
diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala index febb0c0240..d505df66a7 100644 --- a/core/src/main/scala/spark/storage/BlockStore.scala +++ b/core/src/main/scala/spark/storage/BlockStore.scala @@ -1,6 +1,6 @@ package spark.storage -import java.io.{File, RandomAccessFile} +import java.io.{File, FileOutputStream, RandomAccessFile} import java.nio.ByteBuffer import java.nio.channels.FileChannel.MapMode import java.util.{LinkedHashMap, UUID} @@ -8,12 +8,14 @@ import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap} import scala.collection.mutable.ArrayBuffer +import it.unimi.dsi.fastutil.io.FastBufferedOutputStream + import spark.{Utils, Logging, Serializer, SizeEstimator} /** * Abstract class to store blocks */ -abstract class BlockStore(blockManager: BlockManager) extends Logging { +abstract class BlockStore(val blockManager: BlockManager) extends Logging { initLogging() def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) @@ -217,25 +219,28 @@ class DiskStore(blockManager: BlockManager, rootDirs: String) logDebug("Attempting to put block " + blockId) val startTime = System.currentTimeMillis val file = createFile(blockId) - if (file != null) { - val channel = new RandomAccessFile(file, "rw").getChannel() - val buffer = channel.map(MapMode.READ_WRITE, 0, bytes.limit) - buffer.put(bytes) - channel.close() - val finishTime = System.currentTimeMillis - logDebug("Block %s stored to file of %d bytes to disk in %d ms".format( - blockId, bytes.limit, (finishTime - startTime))) - } else { - logError("File not created for block " + blockId) - } + val channel = new RandomAccessFile(file, "rw").getChannel() + val buffer = channel.map(MapMode.READ_WRITE, 0, bytes.limit) + buffer.put(bytes) + channel.close() + val finishTime = System.currentTimeMillis + logDebug("Block %s stored to file of %d bytes to disk in %d ms".format( + blockId, bytes.limit, (finishTime - startTime))) } def putValues(blockId: String, values: Iterator[Any], level: StorageLevel) - : Either[Iterator[Any], ByteBuffer] = { - val bytes = dataSerialize(values) - logDebug("Converted block " + blockId + " to " + bytes.limit + " bytes") - putBytes(blockId, bytes, level) - return Right(bytes) + : Either[Iterator[Any], ByteBuffer] = { + + logDebug("Attempting to write values for block " + blockId) + val file = createFile(blockId) + val fileOut = new FastBufferedOutputStream(new FileOutputStream(file)) + val objOut = blockManager.serializer.newInstance().serializeStream(fileOut) + objOut.writeAll(values) + objOut.close() + + // Return a byte buffer for the contents of the file + val channel = new RandomAccessFile(file, "rw").getChannel() + Right(channel.map(MapMode.READ_WRITE, 0, channel.size())) } def getBytes(blockId: String): Option[ByteBuffer] = { @@ -267,8 +272,7 @@ class DiskStore(blockManager: BlockManager, rootDirs: String) newFile.getParentFile.mkdirs() return newFile } else { - logError("File for block " + blockId + " already exists on disk, " + file) - return null + throw new Exception("File for block " + blockId + " already exists on disk, " + file) } } |