aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorroot <root@ip-10-29-232-35.ec2.internal>2012-09-23 05:56:44 +0000
committerroot <root@ip-10-29-232-35.ec2.internal>2012-09-23 05:56:44 +0000
commite41cab04cafdb610f108a03a9e062d422f2eedd8 (patch)
tree0540632b33ada6e7fa347dd753b6abd797d69cbd /core
parent33fb373e69b5cd437257d5260fb90db7bcec8285 (diff)
downloadspark-e41cab04cafdb610f108a03a9e062d422f2eedd8.tar.gz
spark-e41cab04cafdb610f108a03a9e062d422f2eedd8.tar.bz2
spark-e41cab04cafdb610f108a03a9e062d422f2eedd8.zip
Avoid creating an extra buffer when saving a stream of values as DISK_ONLY
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/storage/BlockStore.scala44
1 files changed, 24 insertions, 20 deletions
diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala
index febb0c0240..d505df66a7 100644
--- a/core/src/main/scala/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/spark/storage/BlockStore.scala
@@ -1,6 +1,6 @@
package spark.storage
-import java.io.{File, RandomAccessFile}
+import java.io.{File, FileOutputStream, RandomAccessFile}
import java.nio.ByteBuffer
import java.nio.channels.FileChannel.MapMode
import java.util.{LinkedHashMap, UUID}
@@ -8,12 +8,14 @@ import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap}
import scala.collection.mutable.ArrayBuffer
+import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
+
import spark.{Utils, Logging, Serializer, SizeEstimator}
/**
* Abstract class to store blocks
*/
-abstract class BlockStore(blockManager: BlockManager) extends Logging {
+abstract class BlockStore(val blockManager: BlockManager) extends Logging {
initLogging()
def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel)
@@ -217,25 +219,28 @@ class DiskStore(blockManager: BlockManager, rootDirs: String)
logDebug("Attempting to put block " + blockId)
val startTime = System.currentTimeMillis
val file = createFile(blockId)
- if (file != null) {
- val channel = new RandomAccessFile(file, "rw").getChannel()
- val buffer = channel.map(MapMode.READ_WRITE, 0, bytes.limit)
- buffer.put(bytes)
- channel.close()
- val finishTime = System.currentTimeMillis
- logDebug("Block %s stored to file of %d bytes to disk in %d ms".format(
- blockId, bytes.limit, (finishTime - startTime)))
- } else {
- logError("File not created for block " + blockId)
- }
+ val channel = new RandomAccessFile(file, "rw").getChannel()
+ val buffer = channel.map(MapMode.READ_WRITE, 0, bytes.limit)
+ buffer.put(bytes)
+ channel.close()
+ val finishTime = System.currentTimeMillis
+ logDebug("Block %s stored to file of %d bytes to disk in %d ms".format(
+ blockId, bytes.limit, (finishTime - startTime)))
}
def putValues(blockId: String, values: Iterator[Any], level: StorageLevel)
- : Either[Iterator[Any], ByteBuffer] = {
- val bytes = dataSerialize(values)
- logDebug("Converted block " + blockId + " to " + bytes.limit + " bytes")
- putBytes(blockId, bytes, level)
- return Right(bytes)
+ : Either[Iterator[Any], ByteBuffer] = {
+
+ logDebug("Attempting to write values for block " + blockId)
+ val file = createFile(blockId)
+ val fileOut = new FastBufferedOutputStream(new FileOutputStream(file))
+ val objOut = blockManager.serializer.newInstance().serializeStream(fileOut)
+ objOut.writeAll(values)
+ objOut.close()
+
+ // Return a byte buffer for the contents of the file
+ val channel = new RandomAccessFile(file, "rw").getChannel()
+ Right(channel.map(MapMode.READ_WRITE, 0, channel.size()))
}
def getBytes(blockId: String): Option[ByteBuffer] = {
@@ -267,8 +272,7 @@ class DiskStore(blockManager: BlockManager, rootDirs: String)
newFile.getParentFile.mkdirs()
return newFile
} else {
- logError("File for block " + blockId + " already exists on disk, " + file)
- return null
+ throw new Exception("File for block " + blockId + " already exists on disk, " + file)
}
}