aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/SparkContext.scala83
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala290
-rw-r--r--core/src/main/scala/spark/storage/BlockStore.scala2
-rw-r--r--core/src/main/scala/spark/storage/DiskStore.scala6
-rw-r--r--core/src/main/scala/spark/storage/MemoryStore.scala170
5 files changed, 325 insertions, 226 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 5742e64b56..83c1b49203 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -55,7 +55,7 @@ class SparkContext(
val sparkHome: String,
val jars: Seq[String])
extends Logging {
-
+
def this(master: String, frameworkName: String) = this(master, frameworkName, null, Nil)
// Ensure logging is initialized before we spawn any threads
@@ -78,30 +78,30 @@ class SparkContext(
true,
isLocal)
SparkEnv.set(env)
-
+
// Used to store a URL for each static file/jar together with the file's local timestamp
val addedFiles = HashMap[String, Long]()
val addedJars = HashMap[String, Long]()
-
+
// Add each JAR given through the constructor
jars.foreach { addJar(_) }
-
+
// Create and start the scheduler
private var taskScheduler: TaskScheduler = {
// Regular expression used for local[N] master format
val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r
// Regular expression for local[N, maxRetries], used in tests with failing tasks
- val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+),([0-9]+)\]""".r
+ val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+)\s*,\s*([0-9]+)\]""".r
// Regular expression for simulating a Spark cluster of [N, cores, memory] locally
- val LOCAL_CLUSTER_REGEX = """local-cluster\[([0-9]+),([0-9]+),([0-9]+)]""".r
+ val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
// Regular expression for connecting to Spark deploy clusters
val SPARK_REGEX = """(spark://.*)""".r
-
+
master match {
- case "local" =>
+ case "local" =>
new LocalScheduler(1, 0, this)
- case LOCAL_N_REGEX(threads) =>
+ case LOCAL_N_REGEX(threads) =>
new LocalScheduler(threads.toInt, 0, this)
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
@@ -112,10 +112,21 @@ class SparkContext(
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName)
scheduler.initialize(backend)
scheduler
-
- case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerlave) =>
+
+ case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
+ // Check to make sure SPARK_MEM <= memoryPerSlave. Otherwise Spark will just hang.
+ val memoryPerSlaveInt = memoryPerSlave.toInt
+ val sparkMemEnv = System.getenv("SPARK_MEM")
+ val sparkMemEnvInt = if (sparkMemEnv != null) Utils.memoryStringToMb(sparkMemEnv) else 512
+ if (sparkMemEnvInt > memoryPerSlaveInt) {
+ throw new SparkException(
+ "Slave memory (%d MB) cannot be smaller than SPARK_MEM (%d MB)".format(
+ memoryPerSlaveInt, sparkMemEnvInt))
+ }
+
val scheduler = new ClusterScheduler(this)
- val localCluster = new LocalSparkCluster(numSlaves.toInt, coresPerSlave.toInt, memoryPerlave.toInt)
+ val localCluster = new LocalSparkCluster(
+ numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
val sparkUrl = localCluster.start()
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName)
scheduler.initialize(backend)
@@ -140,13 +151,13 @@ class SparkContext(
taskScheduler.start()
private var dagScheduler = new DAGScheduler(taskScheduler)
-
+
// Methods for creating RDDs
def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism ): RDD[T] = {
new ParallelCollection[T](this, seq, numSlices)
}
-
+
def makeRDD[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism ): RDD[T] = {
parallelize(seq, numSlices)
}
@@ -187,14 +198,14 @@ class SparkContext(
}
/**
- * Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys,
+ * Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys,
* values and the InputFormat so that users don't need to pass them directly.
*/
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int)
(implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F])
: RDD[(K, V)] = {
hadoopFile(path,
- fm.erasure.asInstanceOf[Class[F]],
+ fm.erasure.asInstanceOf[Class[F]],
km.erasure.asInstanceOf[Class[K]],
vm.erasure.asInstanceOf[Class[V]],
minSplits)
@@ -215,7 +226,7 @@ class SparkContext(
new Configuration)
}
- /**
+ /**
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
* and extra configuration options to pass to the input format.
*/
@@ -231,7 +242,7 @@ class SparkContext(
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf)
}
- /**
+ /**
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
* and extra configuration options to pass to the input format.
*/
@@ -257,14 +268,14 @@ class SparkContext(
sequenceFile(path, keyClass, valueClass, defaultMinSplits)
/**
- * Version of sequenceFile() for types implicitly convertible to Writables through a
+ * Version of sequenceFile() for types implicitly convertible to Writables through a
* WritableConverter.
*
* WritableConverters are provided in a somewhat strange way (by an implicit function) to support
- * both subclasses of Writable and types for which we define a converter (e.g. Int to
+ * both subclasses of Writable and types for which we define a converter (e.g. Int to
* IntWritable). The most natural thing would've been to have implicit objects for the
* converters, but then we couldn't have an object for every subclass of Writable (you can't
- * have a parameterized singleton object). We use functions instead to create a new converter
+ * have a parameterized singleton object). We use functions instead to create a new converter
* for the appropriate type. In addition, we pass the converter a ClassManifest of its type to
* allow it to figure out the Writable class to use in the subclass case.
*/
@@ -289,7 +300,7 @@ class SparkContext(
* that there's very little effort required to save arbitrary objects.
*/
def objectFile[T: ClassManifest](
- path: String,
+ path: String,
minSplits: Int = defaultMinSplits
): RDD[T] = {
sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minSplits)
@@ -318,7 +329,7 @@ class SparkContext(
/**
* Create an accumulator from a "mutable collection" type.
- *
+ *
* Growable and TraversableOnce are the standard APIs that guarantee += and ++=, implemented by
* standard mutable collections. So you can use this with mutable Map, Set, etc.
*/
@@ -329,7 +340,7 @@ class SparkContext(
// Keep around a weak hash map of values to Cached versions?
def broadcast[T](value: T) = SparkEnv.get.broadcastManager.newBroadcast[T] (value, isLocal)
-
+
// Adds a file dependency to all Tasks executed in the future.
def addFile(path: String) {
val uri = new URI(path)
@@ -338,11 +349,11 @@ class SparkContext(
case _ => path
}
addedFiles(key) = System.currentTimeMillis
-
+
// Fetch the file locally in case the task is executed locally
val filename = new File(path.split("/").last)
Utils.fetchFile(path, new File("."))
-
+
logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
}
@@ -350,7 +361,7 @@ class SparkContext(
addedFiles.keySet.map(_.split("/").last).foreach { k => new File(k).delete() }
addedFiles.clear()
}
-
+
// Adds a jar dependency to all Tasks executed in the future.
def addJar(path: String) {
val uri = new URI(path)
@@ -366,7 +377,7 @@ class SparkContext(
addedJars.keySet.map(_.split("/").last).foreach { k => new File(k).delete() }
addedJars.clear()
}
-
+
// Stop the SparkContext
def stop() {
dagScheduler.stop()
@@ -400,7 +411,7 @@ class SparkContext(
/**
* Run a function on a given set of partitions in an RDD and return the results. This is the main
* entry point to the scheduler, by which all actions get launched. The allowLocal flag specifies
- * whether the scheduler can run the computation on the master rather than shipping it out to the
+ * whether the scheduler can run the computation on the master rather than shipping it out to the
* cluster, for short actions like first().
*/
def runJob[T, U: ClassManifest](
@@ -419,13 +430,13 @@ class SparkContext(
def runJob[T, U: ClassManifest](
rdd: RDD[T],
- func: Iterator[T] => U,
+ func: Iterator[T] => U,
partitions: Seq[Int],
allowLocal: Boolean
): Array[U] = {
runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions, allowLocal)
}
-
+
/**
* Run a job on all partitions in an RDD and return the results in an array.
*/
@@ -472,7 +483,7 @@ class SparkContext(
private[spark] def newShuffleId(): Int = {
nextShuffleId.getAndIncrement()
}
-
+
private var nextRddId = new AtomicInteger(0)
// Register a new RDD, returning its RDD ID
@@ -500,7 +511,7 @@ object SparkContext {
implicit def rddToPairRDDFunctions[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) =
new PairRDDFunctions(rdd)
-
+
implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable: ClassManifest](
rdd: RDD[(K, V)]) =
new SequenceFileRDDFunctions(rdd)
@@ -521,7 +532,7 @@ object SparkContext {
implicit def longToLongWritable(l: Long) = new LongWritable(l)
implicit def floatToFloatWritable(f: Float) = new FloatWritable(f)
-
+
implicit def doubleToDoubleWritable(d: Double) = new DoubleWritable(d)
implicit def boolToBoolWritable (b: Boolean) = new BooleanWritable(b)
@@ -532,7 +543,7 @@ object SparkContext {
private implicit def arrayToArrayWritable[T <% Writable: ClassManifest](arr: Traversable[T]): ArrayWritable = {
def anyToWritable[U <% Writable](u: U): Writable = u
-
+
new ArrayWritable(classManifest[T].erasure.asInstanceOf[Class[Writable]],
arr.map(x => anyToWritable(x)).toArray)
}
@@ -576,7 +587,7 @@ object SparkContext {
Nil
}
}
-
+
// Find the JAR that contains the class of a particular object
def jarOfObject(obj: AnyRef): Seq[String] = jarOfClass(obj.getClass)
}
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 7d8f9ff824..21a2901548 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -61,15 +61,31 @@ private[spark]
class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long)
extends Logging {
- case class BlockInfo(level: StorageLevel, tellMaster: Boolean)
+ class BlockInfo(val level: StorageLevel, val tellMaster: Boolean, var pending: Boolean = true) {
+ def waitForReady() {
+ if (pending) {
+ synchronized {
+ while (pending) this.wait()
+ }
+ }
+ }
+
+ def markReady() {
+ pending = false
+ synchronized {
+ this.notifyAll()
+ }
+ }
+ }
private val NUM_LOCKS = 337
private val locker = new BlockLocker(NUM_LOCKS)
private val blockInfo = new ConcurrentHashMap[String, BlockInfo]()
+
private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
- private[storage] val diskStore: BlockStore = new DiskStore(this,
- System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
+ private[storage] val diskStore: BlockStore =
+ new DiskStore(this, System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
val connectionManager = new ConnectionManager(0)
implicit val futureExecContext = connectionManager.futureExecContext
@@ -81,7 +97,6 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
var cacheTracker: CacheTracker = null
val numParallelFetches = BlockManager.getNumParallelFetchesFromSystemProperties
-
val compress = System.getProperty("spark.blockManager.compress", "false").toBoolean
initialize()
@@ -112,45 +127,32 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
/**
- * Change the storage level for a local block in the block info meta data, and
- * tell the master if necessary. Note that this is only a meta data change and
- * does NOT actually change the storage of the block. If the new level is
- * invalid, then block info (if exists) will be silently removed.
+ * Tell the master about the current storage status of a block. This will send a heartbeat
+ * message reflecting the current status, *not* the desired storage level in its block info.
+ * For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.
*/
- private[spark] def setLevelAndTellMaster(
- blockId: String, level: StorageLevel, tellMaster: Boolean = true) {
-
- if (level == null) {
- throw new IllegalArgumentException("Storage level is null")
- }
-
- // If there was earlier info about the block, then use earlier tellMaster
- val oldInfo = blockInfo.get(blockId)
- val newTellMaster = if (oldInfo != null) oldInfo.tellMaster else tellMaster
- if (oldInfo != null && oldInfo.tellMaster != tellMaster) {
- logWarning("Ignoring tellMaster setting as it is different from earlier setting")
- }
-
- // If level is valid, store the block info, else remove the block info
- if (level.isValid) {
- blockInfo.put(blockId, new BlockInfo(level, newTellMaster))
- logDebug("Info for block " + blockId + " updated with new level as " + level)
- } else {
- blockInfo.remove(blockId)
- logDebug("Info for block " + blockId + " removed as new level is null or invalid")
- }
-
- // Tell master if necessary
- if (newTellMaster) {
+ def reportBlockStatus(blockId: String) {
+ locker.getLock(blockId).synchronized {
+ val curLevel = blockInfo.get(blockId) match {
+ case null =>
+ StorageLevel.NONE
+ case info =>
+ info.level match {
+ case null =>
+ StorageLevel.NONE
+ case level =>
+ val inMem = level.useMemory && memoryStore.contains(blockId)
+ val onDisk = level.useDisk && diskStore.contains(blockId)
+ new StorageLevel(onDisk, inMem, level.deserialized, level.replication)
+ }
+ }
master.mustHeartBeat(HeartBeat(
blockManagerId,
blockId,
- level,
- if (level.isValid && level.useMemory) memoryStore.getSize(blockId) else 0,
- if (level.isValid && level.useDisk) diskStore.getSize(blockId) else 0))
+ curLevel,
+ if (curLevel.useMemory) memoryStore.getSize(blockId) else 0L,
+ if (curLevel.useDisk) diskStore.getSize(blockId) else 0L))
logDebug("Told master about block " + blockId)
- } else {
- logDebug("Did not tell master about block " + blockId)
}
}
@@ -182,36 +184,59 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
def getLocal(blockId: String): Option[Iterator[Any]] = {
logDebug("Getting local block " + blockId)
locker.getLock(blockId).synchronized {
- // Check storage level of block
- val level = getLevel(blockId)
- if (level != null) {
- logDebug("Level for block " + blockId + " is " + level + " on local machine")
+ val info = blockInfo.get(blockId)
+ if (info != null) {
+ info.waitForReady() // In case the block is still being put() by another thread
+ val level = info.level
+ logDebug("Level for block " + blockId + " is " + level)
// Look for the block in memory
if (level.useMemory) {
logDebug("Getting block " + blockId + " from memory")
memoryStore.getValues(blockId) match {
- case Some(iterator) => {
- logDebug("Block " + blockId + " found in memory")
+ case Some(iterator) =>
return Some(iterator)
- }
- case None => {
+ case None =>
logDebug("Block " + blockId + " not found in memory")
- }
}
}
- // Look for block on disk
+ // Look for block on disk, potentially loading it back into memory if required
if (level.useDisk) {
logDebug("Getting block " + blockId + " from disk")
- diskStore.getValues(blockId) match {
- case Some(iterator) => {
- logDebug("Block " + blockId + " found in disk")
- return Some(iterator)
+ if (level.useMemory && level.deserialized) {
+ diskStore.getValues(blockId) match {
+ case Some(iterator) =>
+ // Put the block back in memory before returning it
+ memoryStore.putValues(blockId, iterator, level, true) match {
+ case Left(iterator2) =>
+ return Some(iterator2)
+ case _ =>
+ throw new Exception("Memory store did not return back an iterator")
+ }
+ case None =>
+ throw new Exception("Block " + blockId + " not found on disk, though it should be")
}
- case None => {
- throw new Exception("Block " + blockId + " not found on disk, though it should be")
- return None
+ } else if (level.useMemory && !level.deserialized) {
+ // Read it as a byte buffer into memory first, then return it
+ diskStore.getBytes(blockId) match {
+ case Some(bytes) =>
+ // Put a copy of the block back in memory before returning it. Note that we can't
+ // put the ByteBuffer returned by the disk store as that's a memory-mapped file.
+ val copyForMemory = ByteBuffer.allocate(bytes.limit)
+ copyForMemory.put(bytes)
+ memoryStore.putBytes(blockId, copyForMemory, level)
+ bytes.rewind()
+ return Some(dataDeserialize(bytes))
+ case None =>
+ throw new Exception("Block " + blockId + " not found on disk, though it should be")
+ }
+ } else {
+ diskStore.getValues(blockId) match {
+ case Some(iterator) =>
+ return Some(iterator)
+ case None =>
+ throw new Exception("Block " + blockId + " not found on disk, though it should be")
}
}
}
@@ -226,39 +251,46 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
* Get block from the local block manager as serialized bytes.
*/
def getLocalBytes(blockId: String): Option[ByteBuffer] = {
+ // TODO: This whole thing is very similar to getLocal; we need to refactor it somehow
logDebug("Getting local block " + blockId + " as bytes")
locker.getLock(blockId).synchronized {
- // Check storage level of block
- val level = getLevel(blockId)
- if (level != null) {
- logDebug("Level for block " + blockId + " is " + level + " on local machine")
+ val info = blockInfo.get(blockId)
+ if (info != null) {
+ info.waitForReady() // In case the block is still being put() by another thread
+ val level = info.level
+ logDebug("Level for block " + blockId + " is " + level)
// Look for the block in memory
if (level.useMemory) {
logDebug("Getting block " + blockId + " from memory")
memoryStore.getBytes(blockId) match {
- case Some(bytes) => {
- logDebug("Block " + blockId + " found in memory")
+ case Some(bytes) =>
return Some(bytes)
- }
- case None => {
+ case None =>
logDebug("Block " + blockId + " not found in memory")
- }
}
}
// Look for block on disk
if (level.useDisk) {
- logDebug("Getting block " + blockId + " from disk")
+ // Read it as a byte buffer into memory first, then return it
diskStore.getBytes(blockId) match {
- case Some(bytes) => {
- logDebug("Block " + blockId + " found in disk")
+ case Some(bytes) =>
+ if (level.useMemory) {
+ if (level.deserialized) {
+ memoryStore.putBytes(blockId, bytes, level)
+ } else {
+ // The memory store will hang onto the ByteBuffer, so give it a copy instead of
+ // the memory-mapped file buffer we got from the disk store
+ val copyForMemory = ByteBuffer.allocate(bytes.limit)
+ copyForMemory.put(bytes)
+ memoryStore.putBytes(blockId, copyForMemory, level)
+ }
+ }
+ bytes.rewind()
return Some(bytes)
- }
- case None => {
+ case None =>
throw new Exception("Block " + blockId + " not found on disk, though it should be")
- return None
- }
}
}
} else {
@@ -433,6 +465,17 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
throw new IllegalArgumentException("Storage level is null or invalid")
}
+ if (blockInfo.containsKey(blockId)) {
+ logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
+ return
+ }
+
+ // Remember the block's storage level so that we can correctly drop it to disk if it needs
+ // to be dropped right after it got put into memory. Note, however, that other threads will
+ // not be able to get() this block until we call markReady on its BlockInfo.
+ val myInfo = new BlockInfo(level, tellMaster)
+ blockInfo.put(blockId, myInfo)
+
val startTimeMs = System.currentTimeMillis
var bytes: ByteBuffer = null
@@ -446,32 +489,15 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
logDebug("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
+ " to get into synchronized block")
- // Check and warn if block with same id already exists
- if (getLevel(blockId) != null) {
- logWarning("Block " + blockId + " already exists in local machine")
- return
- }
-
- if (level.useMemory && level.useDisk) {
- // If saving to both memory and disk, then serialize only once
- memoryStore.putValues(blockId, values, level, true) match {
- case Left(newValues) =>
- diskStore.putValues(blockId, newValues, level, true) match {
- case Right(newBytes) => bytes = newBytes
- case _ => throw new Exception("Unexpected return value")
- }
- case Right(newBytes) =>
- bytes = newBytes
- diskStore.putBytes(blockId, newBytes, level)
- }
- } else if (level.useMemory) {
- // If only save to memory
+ if (level.useMemory) {
+ // Save it just to memory first, even if it also has useDisk set to true; we will later
+ // drop it to disk if the memory store can't hold it.
memoryStore.putValues(blockId, values, level, true) match {
case Right(newBytes) => bytes = newBytes
case Left(newIterator) => valuesAfterPut = newIterator
}
} else {
- // If only save to disk
+ // Save directly to disk.
val askForBytes = level.replication > 1 // Don't get back the bytes unless we replicate them
diskStore.putValues(blockId, values, level, askForBytes) match {
case Right(newBytes) => bytes = newBytes
@@ -479,8 +505,12 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
- // Store the storage level
- setLevelAndTellMaster(blockId, level, tellMaster)
+ // Now that the block is in either the memory or disk store, let other threads read it,
+ // and tell the master about it.
+ myInfo.markReady()
+ if (tellMaster) {
+ reportBlockStatus(blockId)
+ }
}
logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs))
@@ -523,6 +553,17 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
throw new IllegalArgumentException("Storage level is null or invalid")
}
+ if (blockInfo.containsKey(blockId)) {
+ logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
+ return
+ }
+
+ // Remember the block's storage level so that we can correctly drop it to disk if it needs
+ // to be dropped right after it got put into memory. Note, however, that other threads will
+ // not be able to get() this block until we call markReady on its BlockInfo.
+ val myInfo = new BlockInfo(level, tellMaster)
+ blockInfo.put(blockId, myInfo)
+
val startTimeMs = System.currentTimeMillis
// Initiate the replication before storing it locally. This is faster as
@@ -539,22 +580,22 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
locker.getLock(blockId).synchronized {
logDebug("PutBytes for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
+ " to get into synchronized block")
- if (getLevel(blockId) != null) {
- logWarning("Block " + blockId + " already exists")
- return
- }
if (level.useMemory) {
+ // Store it only in memory at first, even if useDisk is also set to true
bytes.rewind()
memoryStore.putBytes(blockId, bytes, level)
- }
- if (level.useDisk) {
+ } else {
bytes.rewind()
diskStore.putBytes(blockId, bytes, level)
}
- // Store the storage level
- setLevelAndTellMaster(blockId, level, tellMaster)
+ // Now that the block is in either the memory or disk store, let other threads read it,
+ // and tell the master about it.
+ myInfo.markReady()
+ if (tellMaster) {
+ reportBlockStatus(blockId)
+ }
}
// TODO: This code will be removed when CacheTracker is gone.
@@ -606,11 +647,13 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
// TODO: This code will be removed when CacheTracker is gone.
private def notifyTheCacheTracker(key: String) {
- val rddInfo = key.split("_")
- val rddId: Int = rddInfo(1).toInt
- val splitIndex: Int = rddInfo(2).toInt
- val host = System.getProperty("spark.hostname", Utils.localHostName())
- cacheTracker.notifyTheCacheTrackerFromBlockManager(spark.AddedToCache(rddId, splitIndex, host))
+ if (cacheTracker != null) {
+ val rddInfo = key.split("_")
+ val rddId: Int = rddInfo(1).toInt
+ val partition: Int = rddInfo(2).toInt
+ val host = System.getProperty("spark.hostname", Utils.localHostName())
+ cacheTracker.notifyTheCacheTrackerFromBlockManager(spark.AddedToCache(rddId, partition, host))
+ }
}
/**
@@ -628,22 +671,31 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
/**
- * Drop block from memory (called when memory store has reached it limit)
+ * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
+ * store reaches its limit and needs to free up space.
*/
- def dropFromMemory(blockId: String) {
+ def dropFromMemory(blockId: String, data: Either[Iterator[_], ByteBuffer]) {
+ logInfo("Dropping block " + blockId + " from memory")
locker.getLock(blockId).synchronized {
- val level = getLevel(blockId)
- if (level == null) {
- logWarning("Block " + blockId + " cannot be removed from memory as it does not exist")
- return
- }
- if (!level.useMemory) {
- logWarning("Block " + blockId + " cannot be removed from memory as it is not in memory")
- return
+ val info = blockInfo.get(blockId)
+ val level = info.level
+ if (level.useDisk && !diskStore.contains(blockId)) {
+ logInfo("Writing block " + blockId + " to disk")
+ data match {
+ case Left(iterator) =>
+ diskStore.putValues(blockId, iterator, level, false)
+ case Right(bytes) =>
+ diskStore.putBytes(blockId, bytes, level)
+ }
}
memoryStore.remove(blockId)
- val newLevel = new StorageLevel(level.useDisk, false, level.deserialized, level.replication)
- setLevelAndTellMaster(blockId, newLevel)
+ if (info.tellMaster) {
+ reportBlockStatus(blockId)
+ }
+ if (!level.useDisk) {
+ // The block is completely gone from this node; forget it so we can put() it again later.
+ blockInfo.remove(blockId)
+ }
}
}
diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala
index 5f123aca78..ff482ff66b 100644
--- a/core/src/main/scala/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/spark/storage/BlockStore.scala
@@ -31,5 +31,7 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging {
def remove(blockId: String)
+ def contains(blockId: String): Boolean
+
def clear() { }
}
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index d9965f4306..d0c592ccb1 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -26,7 +26,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
addShutdownHook()
override def getSize(blockId: String): Long = {
- getFile(blockId).length
+ getFile(blockId).length()
}
override def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) {
@@ -93,6 +93,10 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
}
}
+ override def contains(blockId: String): Boolean = {
+ getFile(blockId).exists()
+ }
+
private def createFile(blockId: String): File = {
val file = getFile(blockId)
if (file.exists()) {
diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala
index ea6f3c4fcc..74ef326038 100644
--- a/core/src/main/scala/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/spark/storage/MemoryStore.scala
@@ -18,29 +18,12 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
private val entries = new LinkedHashMap[String, Entry](32, 0.75f, true)
private var currentMemory = 0L
- //private val blockDropper = Executors.newSingleThreadExecutor()
- private val blocksToDrop = new ArrayBlockingQueue[String](10000, true)
- private val blockDropper = new Thread("memory store - block dropper") {
- override def run() {
- try {
- while (true) {
- val blockId = blocksToDrop.take()
- logDebug("Block " + blockId + " ready to be dropped")
- blockManager.dropFromMemory(blockId)
- }
- } catch {
- case ie: InterruptedException =>
- logInfo("Shutting down block dropper")
- }
- }
- }
- blockDropper.start()
logInfo("MemoryStore started with capacity %s.".format(Utils.memoryBytesToString(maxMemory)))
def freeMemory: Long = maxMemory - currentMemory
override def getSize(blockId: String): Long = {
- entries.synchronized {
+ synchronized {
entries.get(blockId).size
}
}
@@ -52,19 +35,12 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val elements = new ArrayBuffer[Any]
elements ++= values
val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
- ensureFreeSpace(sizeEstimate)
- val entry = new Entry(elements, sizeEstimate, true)
- entries.synchronized { entries.put(blockId, entry) }
- currentMemory += sizeEstimate
- logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
- blockId, Utils.memoryBytesToString(sizeEstimate), Utils.memoryBytesToString(freeMemory)))
+ tryToPut(blockId, elements, sizeEstimate, true)
} else {
val entry = new Entry(bytes, bytes.limit, false)
- ensureFreeSpace(bytes.limit)
- entries.synchronized { entries.put(blockId, entry) }
- currentMemory += bytes.limit
- logInfo("Block %s stored as serialized bytes to memory (size %s, free %s)".format(
- blockId, Utils.memoryBytesToString(bytes.limit), Utils.memoryBytesToString(freeMemory)))
+ ensureFreeSpace(blockId, bytes.limit)
+ synchronized { entries.put(blockId, entry) }
+ tryToPut(blockId, bytes, bytes.limit, false)
}
}
@@ -79,27 +55,17 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val elements = new ArrayBuffer[Any]
elements ++= values
val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
- ensureFreeSpace(sizeEstimate)
- val entry = new Entry(elements, sizeEstimate, true)
- entries.synchronized { entries.put(blockId, entry) }
- currentMemory += sizeEstimate
- logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
- blockId, Utils.memoryBytesToString(sizeEstimate), Utils.memoryBytesToString(freeMemory)))
+ tryToPut(blockId, elements, sizeEstimate, true)
Left(elements.iterator)
} else {
val bytes = blockManager.dataSerialize(values)
- ensureFreeSpace(bytes.limit)
- val entry = new Entry(bytes, bytes.limit, false)
- entries.synchronized { entries.put(blockId, entry) }
- currentMemory += bytes.limit
- logInfo("Block %s stored as serialized bytes to memory (size %s, free %s)".format(
- blockId, Utils.memoryBytesToString(bytes.limit), Utils.memoryBytesToString(freeMemory)))
+ tryToPut(blockId, bytes, bytes.limit, false)
Right(bytes)
}
}
override def getBytes(blockId: String): Option[ByteBuffer] = {
- val entry = entries.synchronized {
+ val entry = synchronized {
entries.get(blockId)
}
if (entry == null) {
@@ -112,7 +78,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
override def getValues(blockId: String): Option[Iterator[Any]] = {
- val entry = entries.synchronized {
+ val entry = synchronized {
entries.get(blockId)
}
if (entry == null) {
@@ -126,7 +92,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
override def remove(blockId: String) {
- entries.synchronized {
+ synchronized {
val entry = entries.get(blockId)
if (entry != null) {
entries.remove(blockId)
@@ -134,54 +100,118 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
logInfo("Block %s of size %d dropped from memory (free %d)".format(
blockId, entry.size, freeMemory))
} else {
- logWarning("Block " + blockId + " could not be removed as it doesnt exist")
+ logWarning("Block " + blockId + " could not be removed as it does not exist")
}
}
}
override def clear() {
- entries.synchronized {
+ synchronized {
entries.clear()
}
- blockDropper.interrupt()
logInfo("MemoryStore cleared")
}
- // TODO: This should be able to return false if the space is larger than our total memory,
- // or if adding this block would require evicting another one from the same RDD
- private def ensureFreeSpace(space: Long) {
+ /**
+ * Return the RDD ID that a given block ID is from, or null if it is not an RDD block.
+ */
+ private def getRddId(blockId: String): String = {
+ if (blockId.startsWith("rdd_")) {
+ blockId.split('_')(1)
+ } else {
+ null
+ }
+ }
+
+ /**
+ * Try to put in a set of values, if we can free up enough space. The value should either be
+ * an ArrayBuffer if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated)
+ * size must also be passed by the caller.
+ */
+ private def tryToPut(blockId: String, value: Any, size: Long, deserialized: Boolean): Boolean = {
+ synchronized {
+ if (ensureFreeSpace(blockId, size)) {
+ val entry = new Entry(value, size, deserialized)
+ entries.put(blockId, entry)
+ currentMemory += size
+ if (deserialized) {
+ logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
+ blockId, Utils.memoryBytesToString(size), Utils.memoryBytesToString(freeMemory)))
+ } else {
+ logInfo("Block %s stored as bytes to memory (size %s, free %s)".format(
+ blockId, Utils.memoryBytesToString(size), Utils.memoryBytesToString(freeMemory)))
+ }
+ true
+ } else {
+ // Tell the block manager that we couldn't put it in memory so that it can drop it to
+ // disk if the block allows disk storage.
+ val data = if (deserialized) {
+ Left(value.asInstanceOf[ArrayBuffer[Any]].iterator)
+ } else {
+ Right(value.asInstanceOf[ByteBuffer].duplicate())
+ }
+ blockManager.dropFromMemory(blockId, data)
+ false
+ }
+ }
+ }
+
+ /**
+ * Tries to free up a given amount of space to store a particular block, but can fail and return
+ * false if either the block is bigger than our memory or it would require replacing another
+ * block from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that
+ * don't fit into memory that we want to avoid).
+ *
+ * Assumes that a lock on entries is held by the caller.
+ */
+ private def ensureFreeSpace(blockIdToAdd: String, space: Long): Boolean = {
logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format(
space, currentMemory, maxMemory))
+ if (space > maxMemory) {
+ logInfo("Will not store " + blockIdToAdd + " as it is larger than our memory limit")
+ return false
+ }
+
if (maxMemory - currentMemory < space) {
-
+ val rddToAdd = getRddId(blockIdToAdd)
val selectedBlocks = new ArrayBuffer[String]()
var selectedMemory = 0L
- entries.synchronized {
- val iter = entries.entrySet().iterator()
- while (maxMemory - (currentMemory - selectedMemory) < space && iter.hasNext) {
- val pair = iter.next()
- val blockId = pair.getKey
- val entry = pair.getValue
- if (!entry.dropPending) {
- selectedBlocks += blockId
- entry.dropPending = true
- }
- selectedMemory += pair.getValue.size
- logInfo("Block " + blockId + " selected for dropping")
+ val iterator = entries.entrySet().iterator()
+ while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) {
+ val pair = iterator.next()
+ val blockId = pair.getKey
+ if (rddToAdd != null && rddToAdd == getRddId(blockId)) {
+ logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " +
+ "block from the same RDD")
+ return false
}
+ selectedBlocks += blockId
+ selectedMemory += pair.getValue.size
}
- logInfo("" + selectedBlocks.size + " new blocks selected for dropping, " +
- blocksToDrop.size + " blocks pending")
- var i = 0
- while (i < selectedBlocks.size) {
- blocksToDrop.add(selectedBlocks(i))
- i += 1
+ if (maxMemory - (currentMemory - selectedMemory) >= space) {
+ logInfo(selectedBlocks.size + " blocks selected for dropping")
+ for (blockId <- selectedBlocks) {
+ val entry = entries.get(blockId)
+ val data = if (entry.deserialized) {
+ Left(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
+ } else {
+ Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
+ }
+ blockManager.dropFromMemory(blockId, data)
+ }
+ return true
+ } else {
+ return false
}
- selectedBlocks.clear()
}
+ return true
+ }
+
+ override def contains(blockId: String): Boolean = {
+ synchronized { entries.containsKey(blockId) }
}
}