aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-09-28 18:28:13 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-09-28 18:28:13 -0700
commitae8c7d6cfaa3196edef03d10e7f84c3d5e6193c5 (patch)
treee3cbed2cdbad1d92411f0a78654066e6cd47696d
parent3d7267999dac59c901afcfd52ce3f40d007015ae (diff)
downloadspark-ae8c7d6cfaa3196edef03d10e7f84c3d5e6193c5.tar.gz
spark-ae8c7d6cfaa3196edef03d10e7f84c3d5e6193c5.tar.bz2
spark-ae8c7d6cfaa3196edef03d10e7f84c3d5e6193c5.zip
Made disk store use multiple directories, deleted ShuffleManager
-rw-r--r--core/src/main/scala/spark/ShuffleManager.scala98
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala16
-rw-r--r--core/src/main/scala/spark/Utils.scala12
-rw-r--r--core/src/main/scala/spark/storage/BlockStore.scala313
-rw-r--r--core/src/main/scala/spark/storage/DiskStore.scala152
-rw-r--r--core/src/main/scala/spark/storage/MemoryStore.scala171
6 files changed, 345 insertions, 417 deletions
diff --git a/core/src/main/scala/spark/ShuffleManager.scala b/core/src/main/scala/spark/ShuffleManager.scala
deleted file mode 100644
index 24af7f3a08..0000000000
--- a/core/src/main/scala/spark/ShuffleManager.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-package spark
-
-import java.io._
-import java.net.URL
-import java.util.UUID
-import java.util.concurrent.atomic.AtomicLong
-
-import scala.collection.mutable.{ArrayBuffer, HashMap}
-
-import spark._
-
-class ShuffleManager extends Logging {
- private var nextShuffleId = new AtomicLong(0)
-
- private var shuffleDir: File = null
- private var server: HttpServer = null
- private var serverUri: String = null
-
- initialize()
-
- private def initialize() {
- // TODO: localDir should be created by some mechanism common to Spark
- // so that it can be shared among shuffle, broadcast, etc
- val localDirRoot = System.getProperty("spark.local.dir", "/tmp")
- var tries = 0
- var foundLocalDir = false
- var localDir: File = null
- var localDirUuid: UUID = null
- while (!foundLocalDir && tries < 10) {
- tries += 1
- try {
- localDirUuid = UUID.randomUUID
- localDir = new File(localDirRoot, "spark-local-" + localDirUuid)
- if (!localDir.exists) {
- localDir.mkdirs()
- foundLocalDir = true
- }
- } catch {
- case e: Exception =>
- logWarning("Attempt " + tries + " to create local dir failed", e)
- }
- }
- if (!foundLocalDir) {
- logError("Failed 10 attempts to create local dir in " + localDirRoot)
- System.exit(1)
- }
- shuffleDir = new File(localDir, "shuffle")
- shuffleDir.mkdirs()
- logInfo("Shuffle dir: " + shuffleDir)
-
- // Add a shutdown hook to delete the local dir
- Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dir") {
- override def run() {
- Utils.deleteRecursively(localDir)
- }
- })
-
- val extServerPort = System.getProperty(
- "spark.localFileShuffle.external.server.port", "-1").toInt
- if (extServerPort != -1) {
- // We're using an external HTTP server; set URI relative to its root
- var extServerPath = System.getProperty(
- "spark.localFileShuffle.external.server.path", "")
- if (extServerPath != "" && !extServerPath.endsWith("/")) {
- extServerPath += "/"
- }
- serverUri = "http://%s:%d/%s/spark-local-%s".format(
- Utils.localIpAddress, extServerPort, extServerPath, localDirUuid)
- } else {
- // Create our own server
- server = new HttpServer(localDir)
- server.start()
- serverUri = server.uri
- }
- logInfo("Local URI: " + serverUri)
- }
-
- def stop() {
- if (server != null) {
- server.stop()
- }
- }
-
- def getOutputFile(shuffleId: Long, inputId: Int, outputId: Int): File = {
- val dir = new File(shuffleDir, shuffleId + "/" + inputId)
- dir.mkdirs()
- val file = new File(dir, "" + outputId)
- return file
- }
-
- def getServerUri(): String = {
- serverUri
- }
-
- def newShuffleId(): Long = {
- nextShuffleId.getAndIncrement()
- }
-}
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 2c9f46b1a0..280c00b486 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -10,6 +10,13 @@ import spark.storage.BlockManagerMaster
import spark.network.ConnectionManager
import spark.util.AkkaUtils
+/**
+ * Holds all the runtime environment objects for a running Spark instance (either master or worker),
+ * including the serializer, Akka actor system, block manager, map output tracker, etc. Currently
+ * Spark code finds the SparkEnv through a thread-local variable, so each thread that accesses these
+ * objects needs to have the right SparkEnv set. You can get the current environment with
+ * SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set.
+ */
class SparkEnv (
val actorSystem: ActorSystem,
val cache: Cache,
@@ -18,7 +25,6 @@ class SparkEnv (
val cacheTracker: CacheTracker,
val mapOutputTracker: MapOutputTracker,
val shuffleFetcher: ShuffleFetcher,
- val shuffleManager: ShuffleManager,
val broadcastManager: BroadcastManager,
val blockManager: BlockManager,
val connectionManager: ConnectionManager,
@@ -27,7 +33,7 @@ class SparkEnv (
/** No-parameter constructor for unit tests. */
def this() = {
- this(null, null, new JavaSerializer, new JavaSerializer, null, null, null, null, null, null, null, null)
+ this(null, null, new JavaSerializer, new JavaSerializer, null, null, null, null, null, null, null)
}
def stop() {
@@ -35,7 +41,6 @@ class SparkEnv (
mapOutputTracker.stop()
cacheTracker.stop()
shuffleFetcher.stop()
- shuffleManager.stop()
broadcastManager.stop()
blockManager.stop()
blockManager.master.stop()
@@ -88,9 +93,7 @@ object SparkEnv {
val blockManagerMaster = new BlockManagerMaster(actorSystem, isMaster, isLocal)
val blockManager = new BlockManager(blockManagerMaster, serializer)
- val connectionManager = blockManager.connectionManager
-
- val shuffleManager = new ShuffleManager()
+ val connectionManager = blockManager.connectionManager
val broadcastManager = new BroadcastManager(isMaster)
@@ -119,7 +122,6 @@ object SparkEnv {
cacheTracker,
mapOutputTracker,
shuffleFetcher,
- shuffleManager,
broadcastManager,
blockManager,
connectionManager,
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 802277a251..ff28d52484 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -355,8 +355,8 @@ private object Utils extends Logging {
* This is used, for example, to tell users where in their code each RDD got created.
*/
def getSparkCallSite: String = {
- val trace = Thread.currentThread().getStackTrace().filter( el =>
- (!el.getMethodName().contains("getStackTrace")))
+ val trace = Thread.currentThread.getStackTrace().filter( el =>
+ (!el.getMethodName.contains("getStackTrace")))
// Keep crawling up the stack trace until we find the first function not inside of the spark
// package. We track the last (shallowest) contiguous Spark method. This might be an RDD
@@ -369,12 +369,12 @@ private object Utils extends Logging {
for (el <- trace) {
if (!finished) {
- if (el.getClassName().contains("spark") && !el.getClassName().startsWith("spark.examples")) {
- lastSparkMethod = el.getMethodName()
+ if (el.getClassName.contains("spark") && !el.getClassName.startsWith("spark.examples")) {
+ lastSparkMethod = el.getMethodName
}
else {
- firstUserLine = el.getLineNumber()
- firstUserFile = el.getFileName()
+ firstUserLine = el.getLineNumber
+ firstUserFile = el.getFileName
finished = true
}
}
diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala
index 7696aa9567..64773a3b03 100644
--- a/core/src/main/scala/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/spark/storage/BlockStore.scala
@@ -1,27 +1,21 @@
package spark.storage
-import java.io.{File, FileOutputStream, RandomAccessFile}
import java.nio.ByteBuffer
-import java.nio.channels.FileChannel.MapMode
-import java.util.{LinkedHashMap, UUID}
-import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap}
-import scala.collection.mutable.ArrayBuffer
-
-import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
-
-import spark.{Utils, Logging, Serializer, SizeEstimator}
+import spark.Logging
/**
* Abstract class to store blocks
*/
abstract class BlockStore(val blockManager: BlockManager) extends Logging {
- initLogging()
-
- def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel)
+ def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel)
+ /**
+ * Put in a block and return its content as either bytes or another Iterator. This is used
+ * to efficiently write the values to multiple locations (e.g. for replication).
+ */
def putValues(blockId: String, values: Iterator[Any], level: StorageLevel)
- : Either[Iterator[Any], ByteBuffer]
+ : Either[Iterator[Any], ByteBuffer]
/**
* Return the size of a block.
@@ -40,296 +34,3 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging {
def clear() { }
}
-
-/**
- * Class to store blocks in memory
- */
-class MemoryStore(blockManager: BlockManager, maxMemory: Long)
- extends BlockStore(blockManager) {
-
- case class Entry(value: Any, size: Long, deserialized: Boolean, var dropPending: Boolean = false)
-
- private val memoryStore = new LinkedHashMap[String, Entry](32, 0.75f, true)
- private var currentMemory = 0L
-
- //private val blockDropper = Executors.newSingleThreadExecutor()
- private val blocksToDrop = new ArrayBlockingQueue[String](10000, true)
- private val blockDropper = new Thread("memory store - block dropper") {
- override def run() {
- try{
- while (true) {
- val blockId = blocksToDrop.take()
- logDebug("Block " + blockId + " ready to be dropped")
- blockManager.dropFromMemory(blockId)
- }
- } catch {
- case ie: InterruptedException =>
- logInfo("Shutting down block dropper")
- }
- }
- }
- blockDropper.start()
- logInfo("MemoryStore started with capacity %s.".format(Utils.memoryBytesToString(maxMemory)))
-
- def freeMemory: Long = maxMemory - currentMemory
-
- def getSize(blockId: String): Long = memoryStore.synchronized { memoryStore.get(blockId).size }
-
- def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) {
- if (level.deserialized) {
- bytes.rewind()
- val values = dataDeserialize(bytes)
- val elements = new ArrayBuffer[Any]
- elements ++= values
- val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
- ensureFreeSpace(sizeEstimate)
- val entry = new Entry(elements, sizeEstimate, true)
- memoryStore.synchronized { memoryStore.put(blockId, entry) }
- currentMemory += sizeEstimate
- logInfo("Block %s stored as values to memory (estimated size %d, free %d)".format(
- blockId, sizeEstimate, freeMemory))
- } else {
- val entry = new Entry(bytes, bytes.limit, false)
- ensureFreeSpace(bytes.limit)
- memoryStore.synchronized { memoryStore.put(blockId, entry) }
- currentMemory += bytes.limit
- logInfo("Block %s stored as %d bytes to memory (free %d)".format(
- blockId, bytes.limit, freeMemory))
- }
- }
-
- def putValues(blockId: String, values: Iterator[Any], level: StorageLevel)
- : Either[Iterator[Any], ByteBuffer] = {
- if (level.deserialized) {
- val elements = new ArrayBuffer[Any]
- elements ++= values
- val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
- ensureFreeSpace(sizeEstimate)
- val entry = new Entry(elements, sizeEstimate, true)
- memoryStore.synchronized { memoryStore.put(blockId, entry) }
- currentMemory += sizeEstimate
- logInfo("Block %s stored as values to memory (estimated size %d, free %d)".format(
- blockId, sizeEstimate, freeMemory))
- return Left(elements.iterator)
- } else {
- val bytes = dataSerialize(values)
- ensureFreeSpace(bytes.limit)
- val entry = new Entry(bytes, bytes.limit, false)
- memoryStore.synchronized { memoryStore.put(blockId, entry) }
- currentMemory += bytes.limit
- logInfo("Block %s stored as %d bytes to memory (free %d)".format(
- blockId, bytes.limit, freeMemory))
- return Right(bytes)
- }
- }
-
- def getBytes(blockId: String): Option[ByteBuffer] = {
- throw new UnsupportedOperationException("Not implemented")
- }
-
- def getValues(blockId: String): Option[Iterator[Any]] = {
- val entry = memoryStore.synchronized { memoryStore.get(blockId) }
- if (entry == null) {
- return None
- }
- if (entry.deserialized) {
- return Some(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
- } else {
- return Some(dataDeserialize(entry.value.asInstanceOf[ByteBuffer].duplicate()))
- }
- }
-
- def remove(blockId: String) {
- memoryStore.synchronized {
- val entry = memoryStore.get(blockId)
- if (entry != null) {
- memoryStore.remove(blockId)
- currentMemory -= entry.size
- logInfo("Block %s of size %d dropped from memory (free %d)".format(
- blockId, entry.size, freeMemory))
- } else {
- logWarning("Block " + blockId + " could not be removed as it doesnt exist")
- }
- }
- }
-
- override def clear() {
- memoryStore.synchronized {
- memoryStore.clear()
- }
- //blockDropper.shutdown()
- blockDropper.interrupt()
- logInfo("MemoryStore cleared")
- }
-
- private def ensureFreeSpace(space: Long) {
- logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format(
- space, currentMemory, maxMemory))
-
- if (maxMemory - currentMemory < space) {
-
- val selectedBlocks = new ArrayBuffer[String]()
- var selectedMemory = 0L
-
- memoryStore.synchronized {
- val iter = memoryStore.entrySet().iterator()
- while (maxMemory - (currentMemory - selectedMemory) < space && iter.hasNext) {
- val pair = iter.next()
- val blockId = pair.getKey
- val entry = pair.getValue()
- if (!entry.dropPending) {
- selectedBlocks += blockId
- entry.dropPending = true
- }
- selectedMemory += pair.getValue.size
- logInfo("Block " + blockId + " selected for dropping")
- }
- }
-
- logInfo("" + selectedBlocks.size + " new blocks selected for dropping, " +
- blocksToDrop.size + " blocks pending")
- var i = 0
- while (i < selectedBlocks.size) {
- blocksToDrop.add(selectedBlocks(i))
- i += 1
- }
- selectedBlocks.clear()
- }
- }
-}
-
-
-/**
- * Class to store blocks in disk
- */
-class DiskStore(blockManager: BlockManager, rootDirs: String)
- extends BlockStore(blockManager) {
-
- val MAX_DIR_CREATION_ATTEMPTS: Int = 10
- val localDirs = createLocalDirs()
- var lastLocalDirUsed = 0
-
- addShutdownHook()
-
- def getSize(blockId: String): Long = {
- getFile(blockId).length
- }
-
- def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) {
- logDebug("Attempting to put block " + blockId)
- val startTime = System.currentTimeMillis
- val file = createFile(blockId)
- val channel = new RandomAccessFile(file, "rw").getChannel()
- val buffer = channel.map(MapMode.READ_WRITE, 0, bytes.limit)
- buffer.put(bytes)
- channel.close()
- val finishTime = System.currentTimeMillis
- logDebug("Block %s stored to file of %d bytes to disk in %d ms".format(
- blockId, bytes.limit, (finishTime - startTime)))
- }
-
- def putValues(blockId: String, values: Iterator[Any], level: StorageLevel)
- : Either[Iterator[Any], ByteBuffer] = {
-
- logDebug("Attempting to write values for block " + blockId)
- val file = createFile(blockId)
- val fileOut = blockManager.wrapForCompression(
- new FastBufferedOutputStream(new FileOutputStream(file)))
- val objOut = blockManager.serializer.newInstance().serializeStream(fileOut)
- objOut.writeAll(values)
- objOut.close()
-
- // Return a byte buffer for the contents of the file
- val channel = new RandomAccessFile(file, "rw").getChannel()
- Right(channel.map(MapMode.READ_WRITE, 0, channel.size()))
- }
-
- def getBytes(blockId: String): Option[ByteBuffer] = {
- val file = getFile(blockId)
- val length = file.length().toInt
- val channel = new RandomAccessFile(file, "r").getChannel()
- Some(channel.map(MapMode.READ_WRITE, 0, length))
- }
-
- def getValues(blockId: String): Option[Iterator[Any]] = {
- val file = getFile(blockId)
- val length = file.length().toInt
- val channel = new RandomAccessFile(file, "r").getChannel()
- val bytes = channel.map(MapMode.READ_ONLY, 0, length)
- val buffer = dataDeserialize(bytes)
- channel.close()
- return Some(buffer)
- }
-
- def remove(blockId: String) {
- throw new UnsupportedOperationException("Not implemented")
- }
-
- private def createFile(blockId: String): File = {
- val file = getFile(blockId)
- if (file == null) {
- lastLocalDirUsed = (lastLocalDirUsed + 1) % localDirs.size
- val newFile = new File(localDirs(lastLocalDirUsed), blockId)
- newFile.getParentFile.mkdirs()
- return newFile
- } else {
- throw new Exception("File for block " + blockId + " already exists on disk, " + file)
- }
- }
-
- private def getFile(blockId: String): File = {
- logDebug("Getting file for block " + blockId)
- // Search for the file in all the local directories, only one of them should have the file
- val files = localDirs.map(localDir => new File(localDir, blockId)).filter(_.exists)
- if (files.size > 1) {
- throw new Exception("Multiple files for same block " + blockId + " exists: " +
- files.map(_.toString).reduceLeft(_ + ", " + _))
- return null
- } else if (files.size == 0) {
- return null
- } else {
- logDebug("Got file " + files(0) + " of size " + files(0).length + " bytes")
- return files(0)
- }
- }
-
- private def createLocalDirs(): Seq[File] = {
- logDebug("Creating local directories at root dirs '" + rootDirs + "'")
- rootDirs.split("[;,:]").map(rootDir => {
- var foundLocalDir: Boolean = false
- var localDir: File = null
- var localDirUuid: UUID = null
- var tries = 0
- while (!foundLocalDir && tries < MAX_DIR_CREATION_ATTEMPTS) {
- tries += 1
- try {
- localDirUuid = UUID.randomUUID()
- localDir = new File(rootDir, "spark-local-" + localDirUuid)
- if (!localDir.exists) {
- localDir.mkdirs()
- foundLocalDir = true
- }
- } catch {
- case e: Exception =>
- logWarning("Attempt " + tries + " to create local dir failed", e)
- }
- }
- if (!foundLocalDir) {
- logError("Failed " + MAX_DIR_CREATION_ATTEMPTS +
- " attempts to create local dir in " + rootDir)
- System.exit(1)
- }
- logDebug("Created local directory at " + localDir)
- localDir
- })
- }
-
- private def addShutdownHook() {
- Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") {
- override def run() {
- logDebug("Shutdown hook called")
- localDirs.foreach(localDir => Utils.deleteRecursively(localDir))
- }
- })
- }
-}
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
new file mode 100644
index 0000000000..3845cb5d0e
--- /dev/null
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -0,0 +1,152 @@
+package spark.storage
+
+import java.nio.ByteBuffer
+import java.io.{File, FileOutputStream, RandomAccessFile}
+import java.nio.channels.FileChannel.MapMode
+import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
+import java.util.UUID
+import spark.Utils
+
+/**
+ * Stores BlockManager blocks on disk.
+ */
+class DiskStore(blockManager: BlockManager, rootDirs: String)
+ extends BlockStore(blockManager) {
+
+ val MAX_DIR_CREATION_ATTEMPTS: Int = 10
+ val SUBDIRS_PER_LOCAL_DIR = 128
+
+ // Create one local directory for each path mentioned in spark.local.dir; then, inside this
+ // directory, create multiple subdirectories that we will hash files into, in order to avoid
+ // having really large inodes at the top level.
+ val localDirs = createLocalDirs()
+ val subDirs = Array.fill(localDirs.length)(new Array[File](SUBDIRS_PER_LOCAL_DIR))
+
+ addShutdownHook()
+
+ override def getSize(blockId: String): Long = {
+ getFile(blockId).length
+ }
+
+ override def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) {
+ logDebug("Attempting to put block " + blockId)
+ val startTime = System.currentTimeMillis
+ val file = createFile(blockId)
+ val channel = new RandomAccessFile(file, "rw").getChannel()
+ val buffer = channel.map(MapMode.READ_WRITE, 0, bytes.limit)
+ buffer.put(bytes)
+ channel.close()
+ val finishTime = System.currentTimeMillis
+ logDebug("Block %s stored to file of %d bytes to disk in %d ms".format(
+ blockId, bytes.limit, (finishTime - startTime)))
+ }
+
+ override def putValues(blockId: String, values: Iterator[Any], level: StorageLevel)
+ : Either[Iterator[Any], ByteBuffer] = {
+
+ logDebug("Attempting to write values for block " + blockId)
+ val file = createFile(blockId)
+ val fileOut = blockManager.wrapForCompression(
+ new FastBufferedOutputStream(new FileOutputStream(file)))
+ val objOut = blockManager.serializer.newInstance().serializeStream(fileOut)
+ objOut.writeAll(values)
+ objOut.close()
+
+ // Return a byte buffer for the contents of the file
+ val channel = new RandomAccessFile(file, "rw").getChannel()
+ Right(channel.map(MapMode.READ_WRITE, 0, channel.size()))
+ }
+
+ override def getBytes(blockId: String): Option[ByteBuffer] = {
+ val file = getFile(blockId)
+ val length = file.length().toInt
+ val channel = new RandomAccessFile(file, "r").getChannel()
+ Some(channel.map(MapMode.READ_WRITE, 0, length))
+ }
+
+ override def getValues(blockId: String): Option[Iterator[Any]] = {
+ val file = getFile(blockId)
+ val length = file.length().toInt
+ val channel = new RandomAccessFile(file, "r").getChannel()
+ val bytes = channel.map(MapMode.READ_ONLY, 0, length)
+ val buffer = dataDeserialize(bytes)
+ channel.close()
+ Some(buffer)
+ }
+
+ override def remove(blockId: String) {
+ throw new UnsupportedOperationException("Not implemented")
+ }
+
+ private def createFile(blockId: String): File = {
+ val file = getFile(blockId)
+ if (file.exists()) {
+ throw new Exception("File for block " + blockId + " already exists on disk: " + file)
+ }
+ file
+ }
+
+ private def getFile(blockId: String): File = {
+ logDebug("Getting file for block " + blockId)
+
+ // Figure out which local directory it hashes to, and which subdirectory in that
+ val hash = math.abs(blockId.hashCode)
+ val dirId = hash % localDirs.length
+ val subDirId = (hash / localDirs.length) % SUBDIRS_PER_LOCAL_DIR
+
+ // Create the subdirectory if it doesn't already exist
+ val subDir = subDirs(dirId).synchronized {
+ val old = subDirs(dirId)(subDirId)
+ if (old != null) {
+ old
+ } else {
+ val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
+ newDir.mkdir()
+ subDirs(dirId)(subDirId) = newDir
+ newDir
+ }
+ }
+
+ new File(subDir, blockId)
+ }
+
+ private def createLocalDirs(): Array[File] = {
+ logDebug("Creating local directories at root dirs '" + rootDirs + "'")
+ rootDirs.split(",").map(rootDir => {
+ var foundLocalDir: Boolean = false
+ var localDir: File = null
+ var localDirUuid: UUID = null
+ var tries = 0
+ while (!foundLocalDir && tries < MAX_DIR_CREATION_ATTEMPTS) {
+ tries += 1
+ try {
+ localDirUuid = UUID.randomUUID()
+ localDir = new File(rootDir, "spark-local-" + localDirUuid)
+ if (!localDir.exists) {
+ localDir.mkdirs()
+ foundLocalDir = true
+ }
+ } catch {
+ case e: Exception =>
+ logWarning("Attempt " + tries + " to create local dir failed", e)
+ }
+ }
+ if (!foundLocalDir) {
+ logError("Failed " + MAX_DIR_CREATION_ATTEMPTS +
+ " attempts to create local dir in " + rootDir)
+ System.exit(1)
+ }
+ logInfo("Created local directory at " + localDir)
+ localDir
+ })
+ }
+
+ private def addShutdownHook() {
+ Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") {
+ override def run() {
+ logDebug("Shutdown hook called")
+ localDirs.foreach(localDir => Utils.deleteRecursively(localDir))
+ }
+ })
+ }
+}
diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala
new file mode 100644
index 0000000000..24a80b7f96
--- /dev/null
+++ b/core/src/main/scala/spark/storage/MemoryStore.scala
@@ -0,0 +1,171 @@
+package spark.storage
+
+import java.util.LinkedHashMap
+import java.util.concurrent.ArrayBlockingQueue
+import spark.{SizeEstimator, Utils}
+import java.nio.ByteBuffer
+import collection.mutable.ArrayBuffer
+
+/**
+ * Stores blocks in memory, either as ArrayBuffers of deserialized Java objects or as
+ * serialized ByteBuffers.
+ */
+class MemoryStore(blockManager: BlockManager, maxMemory: Long)
+ extends BlockStore(blockManager) {
+
+ case class Entry(value: Any, size: Long, deserialized: Boolean, var dropPending: Boolean = false)
+
+ private val memoryStore = new LinkedHashMap[String, Entry](32, 0.75f, true)
+ private var currentMemory = 0L
+
+ //private val blockDropper = Executors.newSingleThreadExecutor()
+ private val blocksToDrop = new ArrayBlockingQueue[String](10000, true)
+ private val blockDropper = new Thread("memory store - block dropper") {
+ override def run() {
+ try{
+ while (true) {
+ val blockId = blocksToDrop.take()
+ logDebug("Block " + blockId + " ready to be dropped")
+ blockManager.dropFromMemory(blockId)
+ }
+ } catch {
+ case ie: InterruptedException =>
+ logInfo("Shutting down block dropper")
+ }
+ }
+ }
+ blockDropper.start()
+ logInfo("MemoryStore started with capacity %s.".format(Utils.memoryBytesToString(maxMemory)))
+
+ def freeMemory: Long = maxMemory - currentMemory
+
+ override def getSize(blockId: String): Long = {
+ memoryStore.synchronized {
+ memoryStore.get(blockId).size
+ }
+ }
+
+ override def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) {
+ if (level.deserialized) {
+ bytes.rewind()
+ val values = dataDeserialize(bytes)
+ val elements = new ArrayBuffer[Any]
+ elements ++= values
+ val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
+ ensureFreeSpace(sizeEstimate)
+ val entry = new Entry(elements, sizeEstimate, true)
+ memoryStore.synchronized { memoryStore.put(blockId, entry) }
+ currentMemory += sizeEstimate
+ logInfo("Block %s stored as values to memory (estimated size %d, free %d)".format(
+ blockId, sizeEstimate, freeMemory))
+ } else {
+ val entry = new Entry(bytes, bytes.limit, false)
+ ensureFreeSpace(bytes.limit)
+ memoryStore.synchronized { memoryStore.put(blockId, entry) }
+ currentMemory += bytes.limit
+ logInfo("Block %s stored as %d bytes to memory (free %d)".format(
+ blockId, bytes.limit, freeMemory))
+ }
+ }
+
+ override def putValues(blockId: String, values: Iterator[Any], level: StorageLevel)
+ : Either[Iterator[Any], ByteBuffer] = {
+
+ if (level.deserialized) {
+ val elements = new ArrayBuffer[Any]
+ elements ++= values
+ val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
+ ensureFreeSpace(sizeEstimate)
+ val entry = new Entry(elements, sizeEstimate, true)
+ memoryStore.synchronized { memoryStore.put(blockId, entry) }
+ currentMemory += sizeEstimate
+ logInfo("Block %s stored as values to memory (estimated size %d, free %d)".format(
+ blockId, sizeEstimate, freeMemory))
+ return Left(elements.iterator)
+ } else {
+ val bytes = dataSerialize(values)
+ ensureFreeSpace(bytes.limit)
+ val entry = new Entry(bytes, bytes.limit, false)
+ memoryStore.synchronized { memoryStore.put(blockId, entry) }
+ currentMemory += bytes.limit
+ logInfo("Block %s stored as %d bytes to memory (free %d)".format(
+ blockId, bytes.limit, freeMemory))
+ return Right(bytes)
+ }
+ }
+
+ override def getBytes(blockId: String): Option[ByteBuffer] = {
+ throw new UnsupportedOperationException("Not implemented")
+ }
+
+ override def getValues(blockId: String): Option[Iterator[Any]] = {
+ val entry = memoryStore.synchronized { memoryStore.get(blockId) }
+ if (entry == null) {
+ return None
+ }
+ if (entry.deserialized) {
+ return Some(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
+ } else {
+ return Some(dataDeserialize(entry.value.asInstanceOf[ByteBuffer].duplicate()))
+ }
+ }
+
+ override def remove(blockId: String) {
+ memoryStore.synchronized {
+ val entry = memoryStore.get(blockId)
+ if (entry != null) {
+ memoryStore.remove(blockId)
+ currentMemory -= entry.size
+ logInfo("Block %s of size %d dropped from memory (free %d)".format(
+ blockId, entry.size, freeMemory))
+ } else {
+ logWarning("Block " + blockId + " could not be removed as it doesnt exist")
+ }
+ }
+ }
+
+ override def clear() {
+ memoryStore.synchronized {
+ memoryStore.clear()
+ }
+ //blockDropper.shutdown()
+ blockDropper.interrupt()
+ logInfo("MemoryStore cleared")
+ }
+
+ private def ensureFreeSpace(space: Long) {
+ logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format(
+ space, currentMemory, maxMemory))
+
+ if (maxMemory - currentMemory < space) {
+
+ val selectedBlocks = new ArrayBuffer[String]()
+ var selectedMemory = 0L
+
+ memoryStore.synchronized {
+ val iter = memoryStore.entrySet().iterator()
+ while (maxMemory - (currentMemory - selectedMemory) < space && iter.hasNext) {
+ val pair = iter.next()
+ val blockId = pair.getKey
+ val entry = pair.getValue
+ if (!entry.dropPending) {
+ selectedBlocks += blockId
+ entry.dropPending = true
+ }
+ selectedMemory += pair.getValue.size
+ logInfo("Block " + blockId + " selected for dropping")
+ }
+ }
+
+ logInfo("" + selectedBlocks.size + " new blocks selected for dropping, " +
+ blocksToDrop.size + " blocks pending")
+ var i = 0
+ while (i < selectedBlocks.size) {
+ blocksToDrop.add(selectedBlocks(i))
+ i += 1
+ }
+ selectedBlocks.clear()
+ }
+ }
+}
+