diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-09-28 18:28:13 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-09-28 18:28:13 -0700 |
commit | ae8c7d6cfaa3196edef03d10e7f84c3d5e6193c5 (patch) | |
tree | e3cbed2cdbad1d92411f0a78654066e6cd47696d | |
parent | 3d7267999dac59c901afcfd52ce3f40d007015ae (diff) | |
download | spark-ae8c7d6cfaa3196edef03d10e7f84c3d5e6193c5.tar.gz spark-ae8c7d6cfaa3196edef03d10e7f84c3d5e6193c5.tar.bz2 spark-ae8c7d6cfaa3196edef03d10e7f84c3d5e6193c5.zip |
Made disk store use multiple directories, deleted ShuffleManager
-rw-r--r-- | core/src/main/scala/spark/ShuffleManager.scala | 98 | ||||
-rw-r--r-- | core/src/main/scala/spark/SparkEnv.scala | 16 | ||||
-rw-r--r-- | core/src/main/scala/spark/Utils.scala | 12 | ||||
-rw-r--r-- | core/src/main/scala/spark/storage/BlockStore.scala | 313 | ||||
-rw-r--r-- | core/src/main/scala/spark/storage/DiskStore.scala | 152 | ||||
-rw-r--r-- | core/src/main/scala/spark/storage/MemoryStore.scala | 171 |
6 files changed, 345 insertions, 417 deletions
diff --git a/core/src/main/scala/spark/ShuffleManager.scala b/core/src/main/scala/spark/ShuffleManager.scala deleted file mode 100644 index 24af7f3a08..0000000000 --- a/core/src/main/scala/spark/ShuffleManager.scala +++ /dev/null @@ -1,98 +0,0 @@ -package spark - -import java.io._ -import java.net.URL -import java.util.UUID -import java.util.concurrent.atomic.AtomicLong - -import scala.collection.mutable.{ArrayBuffer, HashMap} - -import spark._ - -class ShuffleManager extends Logging { - private var nextShuffleId = new AtomicLong(0) - - private var shuffleDir: File = null - private var server: HttpServer = null - private var serverUri: String = null - - initialize() - - private def initialize() { - // TODO: localDir should be created by some mechanism common to Spark - // so that it can be shared among shuffle, broadcast, etc - val localDirRoot = System.getProperty("spark.local.dir", "/tmp") - var tries = 0 - var foundLocalDir = false - var localDir: File = null - var localDirUuid: UUID = null - while (!foundLocalDir && tries < 10) { - tries += 1 - try { - localDirUuid = UUID.randomUUID - localDir = new File(localDirRoot, "spark-local-" + localDirUuid) - if (!localDir.exists) { - localDir.mkdirs() - foundLocalDir = true - } - } catch { - case e: Exception => - logWarning("Attempt " + tries + " to create local dir failed", e) - } - } - if (!foundLocalDir) { - logError("Failed 10 attempts to create local dir in " + localDirRoot) - System.exit(1) - } - shuffleDir = new File(localDir, "shuffle") - shuffleDir.mkdirs() - logInfo("Shuffle dir: " + shuffleDir) - - // Add a shutdown hook to delete the local dir - Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dir") { - override def run() { - Utils.deleteRecursively(localDir) - } - }) - - val extServerPort = System.getProperty( - "spark.localFileShuffle.external.server.port", "-1").toInt - if (extServerPort != -1) { - // We're using an external HTTP server; set URI relative to its root - var extServerPath = System.getProperty( - "spark.localFileShuffle.external.server.path", "") - if (extServerPath != "" && !extServerPath.endsWith("/")) { - extServerPath += "/" - } - serverUri = "http://%s:%d/%s/spark-local-%s".format( - Utils.localIpAddress, extServerPort, extServerPath, localDirUuid) - } else { - // Create our own server - server = new HttpServer(localDir) - server.start() - serverUri = server.uri - } - logInfo("Local URI: " + serverUri) - } - - def stop() { - if (server != null) { - server.stop() - } - } - - def getOutputFile(shuffleId: Long, inputId: Int, outputId: Int): File = { - val dir = new File(shuffleDir, shuffleId + "/" + inputId) - dir.mkdirs() - val file = new File(dir, "" + outputId) - return file - } - - def getServerUri(): String = { - serverUri - } - - def newShuffleId(): Long = { - nextShuffleId.getAndIncrement() - } -} diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala index 2c9f46b1a0..280c00b486 100644 --- a/core/src/main/scala/spark/SparkEnv.scala +++ b/core/src/main/scala/spark/SparkEnv.scala @@ -10,6 +10,13 @@ import spark.storage.BlockManagerMaster import spark.network.ConnectionManager import spark.util.AkkaUtils +/** + * Holds all the runtime environment objects for a running Spark instance (either master or worker), + * including the serializer, Akka actor system, block manager, map output tracker, etc. Currently + * Spark code finds the SparkEnv through a thread-local variable, so each thread that accesses these + * objects needs to have the right SparkEnv set. You can get the current environment with + * SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set. + */ class SparkEnv ( val actorSystem: ActorSystem, val cache: Cache, @@ -18,7 +25,6 @@ class SparkEnv ( val cacheTracker: CacheTracker, val mapOutputTracker: MapOutputTracker, val shuffleFetcher: ShuffleFetcher, - val shuffleManager: ShuffleManager, val broadcastManager: BroadcastManager, val blockManager: BlockManager, val connectionManager: ConnectionManager, @@ -27,7 +33,7 @@ class SparkEnv ( /** No-parameter constructor for unit tests. */ def this() = { - this(null, null, new JavaSerializer, new JavaSerializer, null, null, null, null, null, null, null, null) + this(null, null, new JavaSerializer, new JavaSerializer, null, null, null, null, null, null, null) } def stop() { @@ -35,7 +41,6 @@ class SparkEnv ( mapOutputTracker.stop() cacheTracker.stop() shuffleFetcher.stop() - shuffleManager.stop() broadcastManager.stop() blockManager.stop() blockManager.master.stop() @@ -88,9 +93,7 @@ object SparkEnv { val blockManagerMaster = new BlockManagerMaster(actorSystem, isMaster, isLocal) val blockManager = new BlockManager(blockManagerMaster, serializer) - val connectionManager = blockManager.connectionManager - - val shuffleManager = new ShuffleManager() + val connectionManager = blockManager.connectionManager val broadcastManager = new BroadcastManager(isMaster) @@ -119,7 +122,6 @@ object SparkEnv { cacheTracker, mapOutputTracker, shuffleFetcher, - shuffleManager, broadcastManager, blockManager, connectionManager, diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 802277a251..ff28d52484 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -355,8 +355,8 @@ private object Utils extends Logging { * This is used, for example, to tell users where in their code each RDD got created. */ def getSparkCallSite: String = { - val trace = Thread.currentThread().getStackTrace().filter( el => - (!el.getMethodName().contains("getStackTrace"))) + val trace = Thread.currentThread.getStackTrace().filter( el => + (!el.getMethodName.contains("getStackTrace"))) // Keep crawling up the stack trace until we find the first function not inside of the spark // package. We track the last (shallowest) contiguous Spark method. This might be an RDD @@ -369,12 +369,12 @@ private object Utils extends Logging { for (el <- trace) { if (!finished) { - if (el.getClassName().contains("spark") && !el.getClassName().startsWith("spark.examples")) { - lastSparkMethod = el.getMethodName() + if (el.getClassName.contains("spark") && !el.getClassName.startsWith("spark.examples")) { + lastSparkMethod = el.getMethodName } else { - firstUserLine = el.getLineNumber() - firstUserFile = el.getFileName() + firstUserLine = el.getLineNumber + firstUserFile = el.getFileName finished = true } } diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala index 7696aa9567..64773a3b03 100644 --- a/core/src/main/scala/spark/storage/BlockStore.scala +++ b/core/src/main/scala/spark/storage/BlockStore.scala @@ -1,27 +1,21 @@ package spark.storage -import java.io.{File, FileOutputStream, RandomAccessFile} import java.nio.ByteBuffer -import java.nio.channels.FileChannel.MapMode -import java.util.{LinkedHashMap, UUID} -import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap} -import scala.collection.mutable.ArrayBuffer - -import it.unimi.dsi.fastutil.io.FastBufferedOutputStream - -import spark.{Utils, Logging, Serializer, SizeEstimator} +import spark.Logging /** * Abstract class to store blocks */ abstract class BlockStore(val blockManager: BlockManager) extends Logging { - initLogging() - - def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) + def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) + /** + * Put in a block and return its content as either bytes or another Iterator. This is used + * to efficiently write the values to multiple locations (e.g. for replication). + */ def putValues(blockId: String, values: Iterator[Any], level: StorageLevel) - : Either[Iterator[Any], ByteBuffer] + : Either[Iterator[Any], ByteBuffer] /** * Return the size of a block. @@ -40,296 +34,3 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging { def clear() { } } - -/** - * Class to store blocks in memory - */ -class MemoryStore(blockManager: BlockManager, maxMemory: Long) - extends BlockStore(blockManager) { - - case class Entry(value: Any, size: Long, deserialized: Boolean, var dropPending: Boolean = false) - - private val memoryStore = new LinkedHashMap[String, Entry](32, 0.75f, true) - private var currentMemory = 0L - - //private val blockDropper = Executors.newSingleThreadExecutor() - private val blocksToDrop = new ArrayBlockingQueue[String](10000, true) - private val blockDropper = new Thread("memory store - block dropper") { - override def run() { - try{ - while (true) { - val blockId = blocksToDrop.take() - logDebug("Block " + blockId + " ready to be dropped") - blockManager.dropFromMemory(blockId) - } - } catch { - case ie: InterruptedException => - logInfo("Shutting down block dropper") - } - } - } - blockDropper.start() - logInfo("MemoryStore started with capacity %s.".format(Utils.memoryBytesToString(maxMemory))) - - def freeMemory: Long = maxMemory - currentMemory - - def getSize(blockId: String): Long = memoryStore.synchronized { memoryStore.get(blockId).size } - - def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) { - if (level.deserialized) { - bytes.rewind() - val values = dataDeserialize(bytes) - val elements = new ArrayBuffer[Any] - elements ++= values - val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef]) - ensureFreeSpace(sizeEstimate) - val entry = new Entry(elements, sizeEstimate, true) - memoryStore.synchronized { memoryStore.put(blockId, entry) } - currentMemory += sizeEstimate - logInfo("Block %s stored as values to memory (estimated size %d, free %d)".format( - blockId, sizeEstimate, freeMemory)) - } else { - val entry = new Entry(bytes, bytes.limit, false) - ensureFreeSpace(bytes.limit) - memoryStore.synchronized { memoryStore.put(blockId, entry) } - currentMemory += bytes.limit - logInfo("Block %s stored as %d bytes to memory (free %d)".format( - blockId, bytes.limit, freeMemory)) - } - } - - def putValues(blockId: String, values: Iterator[Any], level: StorageLevel) - : Either[Iterator[Any], ByteBuffer] = { - if (level.deserialized) { - val elements = new ArrayBuffer[Any] - elements ++= values - val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef]) - ensureFreeSpace(sizeEstimate) - val entry = new Entry(elements, sizeEstimate, true) - memoryStore.synchronized { memoryStore.put(blockId, entry) } - currentMemory += sizeEstimate - logInfo("Block %s stored as values to memory (estimated size %d, free %d)".format( - blockId, sizeEstimate, freeMemory)) - return Left(elements.iterator) - } else { - val bytes = dataSerialize(values) - ensureFreeSpace(bytes.limit) - val entry = new Entry(bytes, bytes.limit, false) - memoryStore.synchronized { memoryStore.put(blockId, entry) } - currentMemory += bytes.limit - logInfo("Block %s stored as %d bytes to memory (free %d)".format( - blockId, bytes.limit, freeMemory)) - return Right(bytes) - } - } - - def getBytes(blockId: String): Option[ByteBuffer] = { - throw new UnsupportedOperationException("Not implemented") - } - - def getValues(blockId: String): Option[Iterator[Any]] = { - val entry = memoryStore.synchronized { memoryStore.get(blockId) } - if (entry == null) { - return None - } - if (entry.deserialized) { - return Some(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator) - } else { - return Some(dataDeserialize(entry.value.asInstanceOf[ByteBuffer].duplicate())) - } - } - - def remove(blockId: String) { - memoryStore.synchronized { - val entry = memoryStore.get(blockId) - if (entry != null) { - memoryStore.remove(blockId) - currentMemory -= entry.size - logInfo("Block %s of size %d dropped from memory (free %d)".format( - blockId, entry.size, freeMemory)) - } else { - logWarning("Block " + blockId + " could not be removed as it doesnt exist") - } - } - } - - override def clear() { - memoryStore.synchronized { - memoryStore.clear() - } - //blockDropper.shutdown() - blockDropper.interrupt() - logInfo("MemoryStore cleared") - } - - private def ensureFreeSpace(space: Long) { - logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format( - space, currentMemory, maxMemory)) - - if (maxMemory - currentMemory < space) { - - val selectedBlocks = new ArrayBuffer[String]() - var selectedMemory = 0L - - memoryStore.synchronized { - val iter = memoryStore.entrySet().iterator() - while (maxMemory - (currentMemory - selectedMemory) < space && iter.hasNext) { - val pair = iter.next() - val blockId = pair.getKey - val entry = pair.getValue() - if (!entry.dropPending) { - selectedBlocks += blockId - entry.dropPending = true - } - selectedMemory += pair.getValue.size - logInfo("Block " + blockId + " selected for dropping") - } - } - - logInfo("" + selectedBlocks.size + " new blocks selected for dropping, " + - blocksToDrop.size + " blocks pending") - var i = 0 - while (i < selectedBlocks.size) { - blocksToDrop.add(selectedBlocks(i)) - i += 1 - } - selectedBlocks.clear() - } - } -} - - -/** - * Class to store blocks in disk - */ -class DiskStore(blockManager: BlockManager, rootDirs: String) - extends BlockStore(blockManager) { - - val MAX_DIR_CREATION_ATTEMPTS: Int = 10 - val localDirs = createLocalDirs() - var lastLocalDirUsed = 0 - - addShutdownHook() - - def getSize(blockId: String): Long = { - getFile(blockId).length - } - - def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) { - logDebug("Attempting to put block " + blockId) - val startTime = System.currentTimeMillis - val file = createFile(blockId) - val channel = new RandomAccessFile(file, "rw").getChannel() - val buffer = channel.map(MapMode.READ_WRITE, 0, bytes.limit) - buffer.put(bytes) - channel.close() - val finishTime = System.currentTimeMillis - logDebug("Block %s stored to file of %d bytes to disk in %d ms".format( - blockId, bytes.limit, (finishTime - startTime))) - } - - def putValues(blockId: String, values: Iterator[Any], level: StorageLevel) - : Either[Iterator[Any], ByteBuffer] = { - - logDebug("Attempting to write values for block " + blockId) - val file = createFile(blockId) - val fileOut = blockManager.wrapForCompression( - new FastBufferedOutputStream(new FileOutputStream(file))) - val objOut = blockManager.serializer.newInstance().serializeStream(fileOut) - objOut.writeAll(values) - objOut.close() - - // Return a byte buffer for the contents of the file - val channel = new RandomAccessFile(file, "rw").getChannel() - Right(channel.map(MapMode.READ_WRITE, 0, channel.size())) - } - - def getBytes(blockId: String): Option[ByteBuffer] = { - val file = getFile(blockId) - val length = file.length().toInt - val channel = new RandomAccessFile(file, "r").getChannel() - Some(channel.map(MapMode.READ_WRITE, 0, length)) - } - - def getValues(blockId: String): Option[Iterator[Any]] = { - val file = getFile(blockId) - val length = file.length().toInt - val channel = new RandomAccessFile(file, "r").getChannel() - val bytes = channel.map(MapMode.READ_ONLY, 0, length) - val buffer = dataDeserialize(bytes) - channel.close() - return Some(buffer) - } - - def remove(blockId: String) { - throw new UnsupportedOperationException("Not implemented") - } - - private def createFile(blockId: String): File = { - val file = getFile(blockId) - if (file == null) { - lastLocalDirUsed = (lastLocalDirUsed + 1) % localDirs.size - val newFile = new File(localDirs(lastLocalDirUsed), blockId) - newFile.getParentFile.mkdirs() - return newFile - } else { - throw new Exception("File for block " + blockId + " already exists on disk, " + file) - } - } - - private def getFile(blockId: String): File = { - logDebug("Getting file for block " + blockId) - // Search for the file in all the local directories, only one of them should have the file - val files = localDirs.map(localDir => new File(localDir, blockId)).filter(_.exists) - if (files.size > 1) { - throw new Exception("Multiple files for same block " + blockId + " exists: " + - files.map(_.toString).reduceLeft(_ + ", " + _)) - return null - } else if (files.size == 0) { - return null - } else { - logDebug("Got file " + files(0) + " of size " + files(0).length + " bytes") - return files(0) - } - } - - private def createLocalDirs(): Seq[File] = { - logDebug("Creating local directories at root dirs '" + rootDirs + "'") - rootDirs.split("[;,:]").map(rootDir => { - var foundLocalDir: Boolean = false - var localDir: File = null - var localDirUuid: UUID = null - var tries = 0 - while (!foundLocalDir && tries < MAX_DIR_CREATION_ATTEMPTS) { - tries += 1 - try { - localDirUuid = UUID.randomUUID() - localDir = new File(rootDir, "spark-local-" + localDirUuid) - if (!localDir.exists) { - localDir.mkdirs() - foundLocalDir = true - } - } catch { - case e: Exception => - logWarning("Attempt " + tries + " to create local dir failed", e) - } - } - if (!foundLocalDir) { - logError("Failed " + MAX_DIR_CREATION_ATTEMPTS + - " attempts to create local dir in " + rootDir) - System.exit(1) - } - logDebug("Created local directory at " + localDir) - localDir - }) - } - - private def addShutdownHook() { - Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") { - override def run() { - logDebug("Shutdown hook called") - localDirs.foreach(localDir => Utils.deleteRecursively(localDir)) - } - }) - } -} diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala new file mode 100644 index 0000000000..3845cb5d0e --- /dev/null +++ b/core/src/main/scala/spark/storage/DiskStore.scala @@ -0,0 +1,152 @@ +package spark.storage + +import java.nio.ByteBuffer +import java.io.{File, FileOutputStream, RandomAccessFile} +import java.nio.channels.FileChannel.MapMode +import it.unimi.dsi.fastutil.io.FastBufferedOutputStream +import java.util.UUID +import spark.Utils + +/** + * Stores BlockManager blocks on disk. + */ +class DiskStore(blockManager: BlockManager, rootDirs: String) + extends BlockStore(blockManager) { + + val MAX_DIR_CREATION_ATTEMPTS: Int = 10 + val SUBDIRS_PER_LOCAL_DIR = 128 + + // Create one local directory for each path mentioned in spark.local.dir; then, inside this + // directory, create multiple subdirectories that we will hash files into, in order to avoid + // having really large inodes at the top level. + val localDirs = createLocalDirs() + val subDirs = Array.fill(localDirs.length)(new Array[File](SUBDIRS_PER_LOCAL_DIR)) + + addShutdownHook() + + override def getSize(blockId: String): Long = { + getFile(blockId).length + } + + override def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) { + logDebug("Attempting to put block " + blockId) + val startTime = System.currentTimeMillis + val file = createFile(blockId) + val channel = new RandomAccessFile(file, "rw").getChannel() + val buffer = channel.map(MapMode.READ_WRITE, 0, bytes.limit) + buffer.put(bytes) + channel.close() + val finishTime = System.currentTimeMillis + logDebug("Block %s stored to file of %d bytes to disk in %d ms".format( + blockId, bytes.limit, (finishTime - startTime))) + } + + override def putValues(blockId: String, values: Iterator[Any], level: StorageLevel) + : Either[Iterator[Any], ByteBuffer] = { + + logDebug("Attempting to write values for block " + blockId) + val file = createFile(blockId) + val fileOut = blockManager.wrapForCompression( + new FastBufferedOutputStream(new FileOutputStream(file))) + val objOut = blockManager.serializer.newInstance().serializeStream(fileOut) + objOut.writeAll(values) + objOut.close() + + // Return a byte buffer for the contents of the file + val channel = new RandomAccessFile(file, "rw").getChannel() + Right(channel.map(MapMode.READ_WRITE, 0, channel.size())) + } + + override def getBytes(blockId: String): Option[ByteBuffer] = { + val file = getFile(blockId) + val length = file.length().toInt + val channel = new RandomAccessFile(file, "r").getChannel() + Some(channel.map(MapMode.READ_WRITE, 0, length)) + } + + override def getValues(blockId: String): Option[Iterator[Any]] = { + val file = getFile(blockId) + val length = file.length().toInt + val channel = new RandomAccessFile(file, "r").getChannel() + val bytes = channel.map(MapMode.READ_ONLY, 0, length) + val buffer = dataDeserialize(bytes) + channel.close() + Some(buffer) + } + + override def remove(blockId: String) { + throw new UnsupportedOperationException("Not implemented") + } + + private def createFile(blockId: String): File = { + val file = getFile(blockId) + if (file.exists()) { + throw new Exception("File for block " + blockId + " already exists on disk: " + file) + } + file + } + + private def getFile(blockId: String): File = { + logDebug("Getting file for block " + blockId) + + // Figure out which local directory it hashes to, and which subdirectory in that + val hash = math.abs(blockId.hashCode) + val dirId = hash % localDirs.length + val subDirId = (hash / localDirs.length) % SUBDIRS_PER_LOCAL_DIR + + // Create the subdirectory if it doesn't already exist + val subDir = subDirs(dirId).synchronized { + val old = subDirs(dirId)(subDirId) + if (old != null) { + old + } else { + val newDir = new File(localDirs(dirId), "%02x".format(subDirId)) + newDir.mkdir() + subDirs(dirId)(subDirId) = newDir + newDir + } + } + + new File(subDir, blockId) + } + + private def createLocalDirs(): Array[File] = { + logDebug("Creating local directories at root dirs '" + rootDirs + "'") + rootDirs.split(",").map(rootDir => { + var foundLocalDir: Boolean = false + var localDir: File = null + var localDirUuid: UUID = null + var tries = 0 + while (!foundLocalDir && tries < MAX_DIR_CREATION_ATTEMPTS) { + tries += 1 + try { + localDirUuid = UUID.randomUUID() + localDir = new File(rootDir, "spark-local-" + localDirUuid) + if (!localDir.exists) { + localDir.mkdirs() + foundLocalDir = true + } + } catch { + case e: Exception => + logWarning("Attempt " + tries + " to create local dir failed", e) + } + } + if (!foundLocalDir) { + logError("Failed " + MAX_DIR_CREATION_ATTEMPTS + + " attempts to create local dir in " + rootDir) + System.exit(1) + } + logInfo("Created local directory at " + localDir) + localDir + }) + } + + private def addShutdownHook() { + Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") { + override def run() { + logDebug("Shutdown hook called") + localDirs.foreach(localDir => Utils.deleteRecursively(localDir)) + } + }) + } +} diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala new file mode 100644 index 0000000000..24a80b7f96 --- /dev/null +++ b/core/src/main/scala/spark/storage/MemoryStore.scala @@ -0,0 +1,171 @@ +package spark.storage + +import java.util.LinkedHashMap +import java.util.concurrent.ArrayBlockingQueue +import spark.{SizeEstimator, Utils} +import java.nio.ByteBuffer +import collection.mutable.ArrayBuffer + +/** + * Stores blocks in memory, either as ArrayBuffers of deserialized Java objects or as + * serialized ByteBuffers. + */ +class MemoryStore(blockManager: BlockManager, maxMemory: Long) + extends BlockStore(blockManager) { + + case class Entry(value: Any, size: Long, deserialized: Boolean, var dropPending: Boolean = false) + + private val memoryStore = new LinkedHashMap[String, Entry](32, 0.75f, true) + private var currentMemory = 0L + + //private val blockDropper = Executors.newSingleThreadExecutor() + private val blocksToDrop = new ArrayBlockingQueue[String](10000, true) + private val blockDropper = new Thread("memory store - block dropper") { + override def run() { + try{ + while (true) { + val blockId = blocksToDrop.take() + logDebug("Block " + blockId + " ready to be dropped") + blockManager.dropFromMemory(blockId) + } + } catch { + case ie: InterruptedException => + logInfo("Shutting down block dropper") + } + } + } + blockDropper.start() + logInfo("MemoryStore started with capacity %s.".format(Utils.memoryBytesToString(maxMemory))) + + def freeMemory: Long = maxMemory - currentMemory + + override def getSize(blockId: String): Long = { + memoryStore.synchronized { + memoryStore.get(blockId).size + } + } + + override def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) { + if (level.deserialized) { + bytes.rewind() + val values = dataDeserialize(bytes) + val elements = new ArrayBuffer[Any] + elements ++= values + val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef]) + ensureFreeSpace(sizeEstimate) + val entry = new Entry(elements, sizeEstimate, true) + memoryStore.synchronized { memoryStore.put(blockId, entry) } + currentMemory += sizeEstimate + logInfo("Block %s stored as values to memory (estimated size %d, free %d)".format( + blockId, sizeEstimate, freeMemory)) + } else { + val entry = new Entry(bytes, bytes.limit, false) + ensureFreeSpace(bytes.limit) + memoryStore.synchronized { memoryStore.put(blockId, entry) } + currentMemory += bytes.limit + logInfo("Block %s stored as %d bytes to memory (free %d)".format( + blockId, bytes.limit, freeMemory)) + } + } + + override def putValues(blockId: String, values: Iterator[Any], level: StorageLevel) + : Either[Iterator[Any], ByteBuffer] = { + + if (level.deserialized) { + val elements = new ArrayBuffer[Any] + elements ++= values + val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef]) + ensureFreeSpace(sizeEstimate) + val entry = new Entry(elements, sizeEstimate, true) + memoryStore.synchronized { memoryStore.put(blockId, entry) } + currentMemory += sizeEstimate + logInfo("Block %s stored as values to memory (estimated size %d, free %d)".format( + blockId, sizeEstimate, freeMemory)) + return Left(elements.iterator) + } else { + val bytes = dataSerialize(values) + ensureFreeSpace(bytes.limit) + val entry = new Entry(bytes, bytes.limit, false) + memoryStore.synchronized { memoryStore.put(blockId, entry) } + currentMemory += bytes.limit + logInfo("Block %s stored as %d bytes to memory (free %d)".format( + blockId, bytes.limit, freeMemory)) + return Right(bytes) + } + } + + override def getBytes(blockId: String): Option[ByteBuffer] = { + throw new UnsupportedOperationException("Not implemented") + } + + override def getValues(blockId: String): Option[Iterator[Any]] = { + val entry = memoryStore.synchronized { memoryStore.get(blockId) } + if (entry == null) { + return None + } + if (entry.deserialized) { + return Some(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator) + } else { + return Some(dataDeserialize(entry.value.asInstanceOf[ByteBuffer].duplicate())) + } + } + + override def remove(blockId: String) { + memoryStore.synchronized { + val entry = memoryStore.get(blockId) + if (entry != null) { + memoryStore.remove(blockId) + currentMemory -= entry.size + logInfo("Block %s of size %d dropped from memory (free %d)".format( + blockId, entry.size, freeMemory)) + } else { + logWarning("Block " + blockId + " could not be removed as it doesnt exist") + } + } + } + + override def clear() { + memoryStore.synchronized { + memoryStore.clear() + } + //blockDropper.shutdown() + blockDropper.interrupt() + logInfo("MemoryStore cleared") + } + + private def ensureFreeSpace(space: Long) { + logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format( + space, currentMemory, maxMemory)) + + if (maxMemory - currentMemory < space) { + + val selectedBlocks = new ArrayBuffer[String]() + var selectedMemory = 0L + + memoryStore.synchronized { + val iter = memoryStore.entrySet().iterator() + while (maxMemory - (currentMemory - selectedMemory) < space && iter.hasNext) { + val pair = iter.next() + val blockId = pair.getKey + val entry = pair.getValue + if (!entry.dropPending) { + selectedBlocks += blockId + entry.dropPending = true + } + selectedMemory += pair.getValue.size + logInfo("Block " + blockId + " selected for dropping") + } + } + + logInfo("" + selectedBlocks.size + " new blocks selected for dropping, " + + blocksToDrop.size + " blocks pending") + var i = 0 + while (i < selectedBlocks.size) { + blocksToDrop.add(selectedBlocks(i)) + i += 1 + } + selectedBlocks.clear() + } + } +} + |