diff options
6 files changed, 375 insertions, 308 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index bd9155ef29..70d6d8369d 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -50,16 +50,6 @@ private[spark] case class BlockException(blockId: String, message: String, ex: Exception = null) extends Exception(message) - -private[spark] class BlockLocker(numLockers: Int) { - private val hashLocker = Array.fill(numLockers)(new Object()) - - def getLock(blockId: String): Object = { - return hashLocker(math.abs(blockId.hashCode % numLockers)) - } -} - - private[spark] class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long) extends Logging { @@ -87,10 +77,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } } - private val NUM_LOCKS = 337 - private val locker = new BlockLocker(NUM_LOCKS) - - private val blockInfo = new ConcurrentHashMap[String, BlockInfo]() + private val blockInfo = new ConcurrentHashMap[String, BlockInfo](1000) private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory) private[storage] val diskStore: BlockStore = @@ -110,7 +97,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m val maxBytesInFlight = System.getProperty("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024 + // Whether to compress broadcast variables that are stored val compressBroadcast = System.getProperty("spark.broadcast.compress", "true").toBoolean + // Whether to compress shuffle output that are stored val compressShuffle = System.getProperty("spark.shuffle.compress", "true").toBoolean // Whether to compress RDD partitions that are stored serialized val compressRdds = System.getProperty("spark.rdd.compress", "false").toBoolean @@ -131,8 +120,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m * BlockManagerWorker actor. */ private def initialize() { - master.mustRegisterBlockManager( - RegisterBlockManager(blockManagerId, maxMemory)) + master.registerBlockManager(blockManagerId, maxMemory) BlockManagerWorker.startBlockManagerWorker(this) } @@ -150,28 +138,27 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m * For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk. */ def reportBlockStatus(blockId: String) { - locker.getLock(blockId).synchronized { - val curLevel = blockInfo.get(blockId) match { - case null => - StorageLevel.NONE - case info => + val (curLevel, inMemSize, onDiskSize) = blockInfo.get(blockId) match { + case null => + (StorageLevel.NONE, 0L, 0L) + case info => + info.synchronized { info.level match { case null => - StorageLevel.NONE + (StorageLevel.NONE, 0L, 0L) case level => val inMem = level.useMemory && memoryStore.contains(blockId) val onDisk = level.useDisk && diskStore.contains(blockId) - new StorageLevel(onDisk, inMem, level.deserialized, level.replication) + ( + new StorageLevel(onDisk, inMem, level.deserialized, level.replication), + if (inMem) memoryStore.getSize(blockId) else 0L, + if (onDisk) diskStore.getSize(blockId) else 0L + ) } - } - master.mustHeartBeat(HeartBeat( - blockManagerId, - blockId, - curLevel, - if (curLevel.useMemory) memoryStore.getSize(blockId) else 0L, - if (curLevel.useDisk) diskStore.getSize(blockId) else 0L)) - logDebug("Told master about block " + blockId) + } } + master.updateBlockInfo(blockManagerId, blockId, curLevel, inMemSize, onDiskSize) + logDebug("Told master about block " + blockId) } /** @@ -179,7 +166,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m */ def getLocations(blockId: String): Seq[String] = { val startTimeMs = System.currentTimeMillis - var managers = master.mustGetLocations(GetLocations(blockId)) + var managers = master.getLocations(blockId) val locations = managers.map(_.ip) logDebug("Get block locations in " + Utils.getUsedTimeMs(startTimeMs)) return locations @@ -190,8 +177,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m */ def getLocations(blockIds: Array[String]): Array[Seq[String]] = { val startTimeMs = System.currentTimeMillis - val locations = master.mustGetLocationsMultipleBlockIds( - GetLocationsMultipleBlockIds(blockIds)).map(_.map(_.ip).toSeq).toArray + val locations = master.getLocations(blockIds).map(_.map(_.ip).toSeq).toArray logDebug("Get multiple block location in " + Utils.getUsedTimeMs(startTimeMs)) return locations } @@ -213,9 +199,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } } - locker.getLock(blockId).synchronized { - val info = blockInfo.get(blockId) - if (info != null) { + val info = blockInfo.get(blockId) + if (info != null) { + info.synchronized { info.waitForReady() // In case the block is still being put() by another thread val level = info.level logDebug("Level for block " + blockId + " is " + level) @@ -273,9 +259,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } } } - } else { - logDebug("Block " + blockId + " not registered locally") } + } else { + logDebug("Block " + blockId + " not registered locally") } return None } @@ -298,9 +284,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } } - locker.getLock(blockId).synchronized { - val info = blockInfo.get(blockId) - if (info != null) { + val info = blockInfo.get(blockId) + if (info != null) { + info.synchronized { info.waitForReady() // In case the block is still being put() by another thread val level = info.level logDebug("Level for block " + blockId + " is " + level) @@ -338,10 +324,11 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m throw new Exception("Block " + blockId + " not found on disk, though it should be") } } - } else { - logDebug("Block " + blockId + " not registered locally") } + } else { + logDebug("Block " + blockId + " not registered locally") } + return None } @@ -354,7 +341,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } logDebug("Getting remote block " + blockId) // Get locations of block - val locations = master.mustGetLocations(GetLocations(blockId)) + val locations = master.getLocations(blockId) // Get block from remote locations for (loc <- locations) { @@ -583,7 +570,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m // Size of the block in bytes (to return to caller) var size = 0L - locker.getLock(blockId).synchronized { + myInfo.synchronized { logDebug("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs) + " to get into synchronized block") @@ -681,7 +668,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m null } - locker.getLock(blockId).synchronized { + myInfo.synchronized { logDebug("PutBytes for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs) + " to get into synchronized block") @@ -732,7 +719,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m val tLevel: StorageLevel = new StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1) if (cachedPeers == null) { - cachedPeers = master.mustGetPeers(GetPeers(blockManagerId, level.replication - 1)) + cachedPeers = master.getPeers(blockManagerId, level.replication - 1) } for (peer: BlockManagerId <- cachedPeers) { val start = System.nanoTime @@ -779,26 +766,30 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m */ def dropFromMemory(blockId: String, data: Either[ArrayBuffer[Any], ByteBuffer]) { logInfo("Dropping block " + blockId + " from memory") - locker.getLock(blockId).synchronized { - val info = blockInfo.get(blockId) - val level = info.level - if (level.useDisk && !diskStore.contains(blockId)) { - logInfo("Writing block " + blockId + " to disk") - data match { - case Left(elements) => - diskStore.putValues(blockId, elements, level, false) - case Right(bytes) => - diskStore.putBytes(blockId, bytes, level) + val info = blockInfo.get(blockId) + if (info != null) { + info.synchronized { + val level = info.level + if (level.useDisk && !diskStore.contains(blockId)) { + logInfo("Writing block " + blockId + " to disk") + data match { + case Left(elements) => + diskStore.putValues(blockId, elements, level, false) + case Right(bytes) => + diskStore.putBytes(blockId, bytes, level) + } + } + memoryStore.remove(blockId) + if (info.tellMaster) { + reportBlockStatus(blockId) + } + if (!level.useDisk) { + // The block is completely gone from this node; forget it so we can put() it again later. + blockInfo.remove(blockId) } } - memoryStore.remove(blockId) - if (info.tellMaster) { - reportBlockStatus(blockId) - } - if (!level.useDisk) { - // The block is completely gone from this node; forget it so we can put() it again later. - blockInfo.remove(blockId) - } + } else { + // The block has already been dropped } } diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index b3345623b3..4d5ee8318c 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -26,7 +26,7 @@ case class RegisterBlockManager( extends ToBlockManagerMaster private[spark] -class HeartBeat( +class UpdateBlockInfo( var blockManagerId: BlockManagerId, var blockId: String, var storageLevel: StorageLevel, @@ -57,17 +57,17 @@ class HeartBeat( } private[spark] -object HeartBeat { +object UpdateBlockInfo { def apply(blockManagerId: BlockManagerId, blockId: String, storageLevel: StorageLevel, memSize: Long, - diskSize: Long): HeartBeat = { - new HeartBeat(blockManagerId, blockId, storageLevel, memSize, diskSize) + diskSize: Long): UpdateBlockInfo = { + new UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize) } // For pattern-matching - def unapply(h: HeartBeat): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = { + def unapply(h: UpdateBlockInfo): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = { Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize)) } } @@ -182,8 +182,8 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor case RegisterBlockManager(blockManagerId, maxMemSize) => register(blockManagerId, maxMemSize) - case HeartBeat(blockManagerId, blockId, storageLevel, deserializedSize, size) => - heartBeat(blockManagerId, blockId, storageLevel, deserializedSize, size) + case UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) => + updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) case GetLocations(blockId) => getLocations(blockId) @@ -233,7 +233,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor sender ! true } - private def heartBeat( + private def updateBlockInfo( blockManagerId: BlockManagerId, blockId: String, storageLevel: StorageLevel, @@ -245,7 +245,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor if (blockId == null) { blockManagerInfo(blockManagerId).updateLastSeenMs() - logDebug("Got in heartBeat 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs)) + logDebug("Got in updateBlockInfo 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs)) sender ! true } @@ -350,211 +350,124 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Boolean, isLocal: Boolean) extends Logging { - val AKKA_ACTOR_NAME: String = "BlockMasterManager" - val REQUEST_RETRY_INTERVAL_MS = 100 - val DEFAULT_MASTER_IP: String = System.getProperty("spark.master.host", "localhost") - val DEFAULT_MASTER_PORT: Int = System.getProperty("spark.master.port", "7077").toInt - val DEFAULT_MANAGER_IP: String = Utils.localHostName() - val DEFAULT_MANAGER_PORT: String = "10902" - + val actorName = "BlockMasterManager" val timeout = 10.seconds - var masterActor: ActorRef = null + val maxAttempts = 5 - if (isMaster) { - masterActor = actorSystem.actorOf( - Props(new BlockManagerMasterActor(isLocal)), name = AKKA_ACTOR_NAME) + var masterActor = if (isMaster) { + val actor = actorSystem.actorOf(Props(new BlockManagerMasterActor(isLocal)), name = actorName) logInfo("Registered BlockManagerMaster Actor") + actor } else { - val url = "akka://spark@%s:%s/user/%s".format( - DEFAULT_MASTER_IP, DEFAULT_MASTER_PORT, AKKA_ACTOR_NAME) + val host = System.getProperty("spark.master.host", "localhost") + val port = System.getProperty("spark.master.port", "7077").toInt + val url = "akka://spark@%s:%s/user/%s".format(host, port, actorName) + val actor = actorSystem.actorFor(url) logInfo("Connecting to BlockManagerMaster: " + url) - masterActor = actorSystem.actorFor(url) + actor } - def stop() { - if (masterActor != null) { - communicate(StopBlockManagerMaster) - masterActor = null - logInfo("BlockManagerMaster stopped") + /** + * Send a message to the master actor and get its result within a default timeout, or + * throw a SparkException if this fails. + */ + private def ask[T](message: Any): T = { + // TODO: Consider removing multiple attempts + if (masterActor == null) { + throw new SparkException("Error sending message to BlockManager as masterActor is null " + + "[message = " + message + "]") } - } - - // Send a message to the master actor and get its result within a default timeout, or - // throw a SparkException if this fails. - def askMaster(message: Any): Any = { - try { - val future = masterActor.ask(message)(timeout) - return Await.result(future, timeout) - } catch { - case e: Exception => - throw new SparkException("Error communicating with BlockManagerMaster", e) + var attempts = 0 + var lastException: Exception = null + while (attempts < maxAttempts) { + attempts += 1 + try { + val future = masterActor.ask(message)(timeout) + val result = Await.result(future, timeout) + if (result == null) { + throw new Exception("BlockManagerMaster returned null") + } + return result.asInstanceOf[T] + } catch { + case ie: InterruptedException => + throw ie + case e: Exception => + lastException = e + logWarning( + "Error sending message to BlockManagerMaster in " + attempts + " attempts", e) + } + Thread.sleep(100) } + throw new SparkException( + "Error sending message to BlockManagerMaster [message = " + message + "]", lastException) } - // Send a one-way message to the master actor, to which we expect it to reply with true. - def communicate(message: Any) { - if (askMaster(message) != true) { - throw new SparkException("Error reply received from BlockManagerMaster") + /** + * Send a one-way message to the master actor, to which we expect it to reply with true + */ + private def tell(message: Any) { + if (!ask[Boolean](message)) { + throw new SparkException("Telling master a message returned false") } } - def notifyADeadHost(host: String) { - communicate(RemoveHost(host + ":" + DEFAULT_MANAGER_PORT)) - logInfo("Removed " + host + " successfully in notifyADeadHost") - } - - def mustRegisterBlockManager(msg: RegisterBlockManager) { + /** + * Register the BlockManager's id with the master + */ + def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long) { logInfo("Trying to register BlockManager") - while (! syncRegisterBlockManager(msg)) { - logWarning("Failed to register " + msg) - Thread.sleep(REQUEST_RETRY_INTERVAL_MS) - } - logInfo("Done registering BlockManager") - } - - def syncRegisterBlockManager(msg: RegisterBlockManager): Boolean = { - //val masterActor = RemoteActor.select(node, name) - val startTimeMs = System.currentTimeMillis() - val tmp = " msg " + msg + " " - logDebug("Got in syncRegisterBlockManager 0 " + tmp + Utils.getUsedTimeMs(startTimeMs)) - - try { - communicate(msg) - logInfo("BlockManager registered successfully @ syncRegisterBlockManager") - logDebug("Got in syncRegisterBlockManager 1 " + tmp + Utils.getUsedTimeMs(startTimeMs)) - return true - } catch { - case e: Exception => - logError("Failed in syncRegisterBlockManager", e) - return false - } + tell(RegisterBlockManager(blockManagerId, maxMemSize)) + logInfo("Registered BlockManager") } - def mustHeartBeat(msg: HeartBeat) { - while (! syncHeartBeat(msg)) { - logWarning("Failed to send heartbeat" + msg) - Thread.sleep(REQUEST_RETRY_INTERVAL_MS) - } - } - - def syncHeartBeat(msg: HeartBeat): Boolean = { - val startTimeMs = System.currentTimeMillis() - val tmp = " msg " + msg + " " - logDebug("Got in syncHeartBeat " + tmp + " 0 " + Utils.getUsedTimeMs(startTimeMs)) - - try { - communicate(msg) - logDebug("Heartbeat sent successfully") - logDebug("Got in syncHeartBeat 1 " + tmp + " 1 " + Utils.getUsedTimeMs(startTimeMs)) - return true - } catch { - case e: Exception => - logError("Failed in syncHeartBeat", e) - return false - } + def updateBlockInfo( + blockManagerId: BlockManagerId, + blockId: String, + storageLevel: StorageLevel, + memSize: Long, + diskSize: Long + ) { + tell(UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize)) + logInfo("Updated info of block " + blockId) } - def mustGetLocations(msg: GetLocations): Seq[BlockManagerId] = { - var res = syncGetLocations(msg) - while (res == null) { - logInfo("Failed to get locations " + msg) - Thread.sleep(REQUEST_RETRY_INTERVAL_MS) - res = syncGetLocations(msg) - } - return res + /** Get locations of the blockId from the master */ + def getLocations(blockId: String): Seq[BlockManagerId] = { + ask[Seq[BlockManagerId]](GetLocations(blockId)) } - def syncGetLocations(msg: GetLocations): Seq[BlockManagerId] = { - val startTimeMs = System.currentTimeMillis() - val tmp = " msg " + msg + " " - logDebug("Got in syncGetLocations 0 " + tmp + Utils.getUsedTimeMs(startTimeMs)) - - try { - val answer = askMaster(msg).asInstanceOf[ArrayBuffer[BlockManagerId]] - if (answer != null) { - logDebug("GetLocations successful") - logDebug("Got in syncGetLocations 1 " + tmp + Utils.getUsedTimeMs(startTimeMs)) - return answer - } else { - logError("Master replied null in response to GetLocations") - return null - } - } catch { - case e: Exception => - logError("GetLocations failed", e) - return null - } + /** Get locations of multiple blockIds from the master */ + def getLocations(blockIds: Array[String]): Seq[Seq[BlockManagerId]] = { + ask[Seq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds)) } - def mustGetLocationsMultipleBlockIds(msg: GetLocationsMultipleBlockIds): - Seq[Seq[BlockManagerId]] = { - var res: Seq[Seq[BlockManagerId]] = syncGetLocationsMultipleBlockIds(msg) - while (res == null) { - logWarning("Failed to GetLocationsMultipleBlockIds " + msg) - Thread.sleep(REQUEST_RETRY_INTERVAL_MS) - res = syncGetLocationsMultipleBlockIds(msg) + /** Get ids of other nodes in the cluster from the master */ + def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = { + val result = ask[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers)) + if (result.length != numPeers) { + throw new SparkException( + "Error getting peers, only got " + result.size + " instead of " + numPeers) } - return res + result } - def syncGetLocationsMultipleBlockIds(msg: GetLocationsMultipleBlockIds): - Seq[Seq[BlockManagerId]] = { - val startTimeMs = System.currentTimeMillis - val tmp = " msg " + msg + " " - logDebug("Got in syncGetLocationsMultipleBlockIds 0 " + tmp + Utils.getUsedTimeMs(startTimeMs)) - - try { - val answer = askMaster(msg).asInstanceOf[Seq[Seq[BlockManagerId]]] - if (answer != null) { - logDebug("GetLocationsMultipleBlockIds successful") - logDebug("Got in syncGetLocationsMultipleBlockIds 1 " + tmp + - Utils.getUsedTimeMs(startTimeMs)) - return answer - } else { - logError("Master replied null in response to GetLocationsMultipleBlockIds") - return null - } - } catch { - case e: Exception => - logError("GetLocationsMultipleBlockIds failed", e) - return null - } + /** Notify the master of a dead node */ + def notifyADeadHost(host: String) { + tell(RemoveHost(host + ":10902")) + logInfo("Told BlockManagerMaster to remove dead host " + host) } - def mustGetPeers(msg: GetPeers): Seq[BlockManagerId] = { - var res = syncGetPeers(msg) - while ((res == null) || (res.length != msg.size)) { - logInfo("Failed to get peers " + msg) - Thread.sleep(REQUEST_RETRY_INTERVAL_MS) - res = syncGetPeers(msg) - } - - return res + /** Get the memory status form the master */ + def getMemoryStatus(): Map[BlockManagerId, (Long, Long)] = { + ask[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus) } - def syncGetPeers(msg: GetPeers): Seq[BlockManagerId] = { - val startTimeMs = System.currentTimeMillis - val tmp = " msg " + msg + " " - logDebug("Got in syncGetPeers 0 " + tmp + Utils.getUsedTimeMs(startTimeMs)) - - try { - val answer = askMaster(msg).asInstanceOf[Seq[BlockManagerId]] - if (answer != null) { - logDebug("GetPeers successful") - logDebug("Got in syncGetPeers 1 " + tmp + Utils.getUsedTimeMs(startTimeMs)) - return answer - } else { - logError("Master replied null in response to GetPeers") - return null - } - } catch { - case e: Exception => - logError("GetPeers failed", e) - return null + /** Stop the master actor, called only on the Spark master node */ + def stop() { + if (masterActor != null) { + tell(StopBlockManagerMaster) + masterActor = null + logInfo("BlockManagerMaster stopped") } } - - def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = { - askMaster(GetMemoryStatus).asInstanceOf[Map[BlockManagerId, (Long, Long)]] - } } diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala index 773970446a..09769d1f7d 100644 --- a/core/src/main/scala/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/spark/storage/MemoryStore.scala @@ -17,13 +17,16 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) private val entries = new LinkedHashMap[String, Entry](32, 0.75f, true) private var currentMemory = 0L + // Object used to ensure that only one thread is putting blocks and if necessary, dropping + // blocks from the memory store. + private val putLock = new Object() logInfo("MemoryStore started with capacity %s.".format(Utils.memoryBytesToString(maxMemory))) def freeMemory: Long = maxMemory - currentMemory override def getSize(blockId: String): Long = { - synchronized { + entries.synchronized { entries.get(blockId).size } } @@ -38,8 +41,6 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) tryToPut(blockId, elements, sizeEstimate, true) } else { val entry = new Entry(bytes, bytes.limit, false) - ensureFreeSpace(blockId, bytes.limit) - synchronized { entries.put(blockId, entry) } tryToPut(blockId, bytes, bytes.limit, false) } } @@ -63,7 +64,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } override def getBytes(blockId: String): Option[ByteBuffer] = { - val entry = synchronized { + val entry = entries.synchronized { entries.get(blockId) } if (entry == null) { @@ -76,7 +77,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } override def getValues(blockId: String): Option[Iterator[Any]] = { - val entry = synchronized { + val entry = entries.synchronized { entries.get(blockId) } if (entry == null) { @@ -90,7 +91,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } override def remove(blockId: String) { - synchronized { + entries.synchronized { val entry = entries.get(blockId) if (entry != null) { entries.remove(blockId) @@ -104,7 +105,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } override def clear() { - synchronized { + entries.synchronized { entries.clear() } logInfo("MemoryStore cleared") @@ -125,12 +126,22 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) * Try to put in a set of values, if we can free up enough space. The value should either be * an ArrayBuffer if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) * size must also be passed by the caller. + * + * Locks on the object putLock to ensure that all the put requests and its associated block + * dropping is done by only on thread at a time. Otherwise while one thread is dropping + * blocks to free memory for one block, another thread may use up the freed space for + * another block. */ private def tryToPut(blockId: String, value: Any, size: Long, deserialized: Boolean): Boolean = { - synchronized { + // TODO: Its possible to optimize the locking by locking entries only when selecting blocks + // to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has been + // released, it must be ensured that those to-be-dropped blocks are not double counted for + // freeing up more space for another block that needs to be put. Only then the actually dropping + // of blocks (and writing to disk if necessary) can proceed in parallel. + putLock.synchronized { if (ensureFreeSpace(blockId, size)) { val entry = new Entry(value, size, deserialized) - entries.put(blockId, entry) + entries.synchronized { entries.put(blockId, entry) } currentMemory += size if (deserialized) { logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format( @@ -160,8 +171,8 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) * block from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that * don't fit into memory that we want to avoid). * - * Assumes that a lock on the MemoryStore is held by the caller. (Otherwise, the freed space - * might fill up before the caller puts in their new value.) + * Assumes that a lock is held by the caller to ensure only one thread is dropping blocks. + * Otherwise, the freed space may fill up before the caller puts in their new value. */ private def ensureFreeSpace(blockIdToAdd: String, space: Long): Boolean = { logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format( @@ -172,36 +183,44 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) return false } - // TODO: This should relinquish the lock on the MemoryStore while flushing out old blocks - // in order to allow parallelism in writing to disk if (maxMemory - currentMemory < space) { val rddToAdd = getRddId(blockIdToAdd) val selectedBlocks = new ArrayBuffer[String]() var selectedMemory = 0L - val iterator = entries.entrySet().iterator() - while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) { - val pair = iterator.next() - val blockId = pair.getKey - if (rddToAdd != null && rddToAdd == getRddId(blockId)) { - logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " + - "block from the same RDD") - return false + // This is synchronized to ensure that the set of entries is not changed + // (because of getValue or getBytes) while traversing the iterator, as that + // can lead to exceptions. + entries.synchronized { + val iterator = entries.entrySet().iterator() + while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) { + val pair = iterator.next() + val blockId = pair.getKey + if (rddToAdd != null && rddToAdd == getRddId(blockId)) { + logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " + + "block from the same RDD") + return false + } + selectedBlocks += blockId + selectedMemory += pair.getValue.size } - selectedBlocks += blockId - selectedMemory += pair.getValue.size } if (maxMemory - (currentMemory - selectedMemory) >= space) { logInfo(selectedBlocks.size + " blocks selected for dropping") for (blockId <- selectedBlocks) { - val entry = entries.get(blockId) - val data = if (entry.deserialized) { - Left(entry.value.asInstanceOf[ArrayBuffer[Any]]) - } else { - Right(entry.value.asInstanceOf[ByteBuffer].duplicate()) + val entry = entries.synchronized { entries.get(blockId) } + // This should never be null as only one thread should be dropping + // blocks and removing entries. However the check is still here for + // future safety. + if (entry != null) { + val data = if (entry.deserialized) { + Left(entry.value.asInstanceOf[ArrayBuffer[Any]]) + } else { + Right(entry.value.asInstanceOf[ByteBuffer].duplicate()) + } + blockManager.dropFromMemory(blockId, data) } - blockManager.dropFromMemory(blockId, data) } return true } else { @@ -212,7 +231,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } override def contains(blockId: String): Boolean = { - synchronized { entries.containsKey(blockId) } + entries.synchronized { entries.containsKey(blockId) } } } diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index b9c19e61cd..0e78228134 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -20,9 +20,11 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT var oldOops: String = null // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test + System.setProperty("spark.kryoserializer.buffer.mb", "1") val serializer = new KryoSerializer before { + actorSystem = ActorSystem("test") master = new BlockManagerMaster(actorSystem, true, true) @@ -55,7 +57,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } } - test("manager-master interaction") { + test("master + 1 manager interaction") { store = new BlockManager(master, serializer, 2000) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) @@ -72,17 +74,33 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.getSingle("a3") != None, "a3 was not in store") // Checking whether master knows about the blocks or not - assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1") - assert(master.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2") - assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3") + assert(master.getLocations("a1").size === 1, "master was not told about a1") + assert(master.getLocations("a2").size === 1, "master was not told about a2") + assert(master.getLocations("a3").size === 0, "master was told about a3") // Drop a1 and a2 from memory; this should be reported back to the master store.dropFromMemory("a1", null) store.dropFromMemory("a2", null) assert(store.getSingle("a1") === None, "a1 not removed from store") assert(store.getSingle("a2") === None, "a2 not removed from store") - assert(master.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1") - assert(master.mustGetLocations(GetLocations("a2")).size === 0, "master did not remove a2") + assert(master.getLocations("a1").size === 0, "master did not remove a1") + assert(master.getLocations("a2").size === 0, "master did not remove a2") + } + + test("master + 2 managers interaction") { + store = new BlockManager(master, serializer, 2000) + val otherStore = new BlockManager(master, new KryoSerializer, 2000) + + val peers = master.getPeers(store.blockManagerId, 1) + assert(peers.size === 1, "master did not return the other manager as a peer") + assert(peers.head === otherStore.blockManagerId, "peer returned by master is not the other manager") + + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_2) + otherStore.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2) + assert(master.getLocations("a1").size === 2, "master did not report 2 locations for a1") + assert(master.getLocations("a2").size === 2, "master did not report 2 locations for a2") } test("in-memory LRU storage") { diff --git a/streaming/src/main/scala/spark/streaming/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/FileInputDStream.scala index 9d7361097b..88856364d2 100644 --- a/streaming/src/main/scala/spark/streaming/FileInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/FileInputDStream.scala @@ -6,7 +6,8 @@ import spark.rdd.UnionRDD import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} -import java.io.{ObjectInputStream, IOException} + +import scala.collection.mutable.HashSet class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest]( @@ -19,7 +20,8 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K @transient private var path_ : Path = null @transient private var fs_ : FileSystem = null - var lastModTime: Long = 0 + var lastModTime = 0L + val lastModTimeFiles = new HashSet[String]() def path(): Path = { if (path_ == null) path_ = new Path(directory) @@ -40,22 +42,37 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K } override def stop() { } - + + /** + * Finds the files that were modified since the last time this method was called and makes + * a union RDD out of them. Note that this maintains the list of files that were processed + * in the latest modification time in the previous call to this method. This is because the + * modification time returned by the FileStatus API seems to return times only at the + * granularity of seconds. Hence, new files may have the same modification time as the + * latest modification time in the previous call to this method and the list of files + * maintained is used to filter the one that have been processed. + */ override def compute(validTime: Time): Option[RDD[(K, V)]] = { + // Create the filter for selecting new files val newFilter = new PathFilter() { var latestModTime = 0L - + val latestModTimeFiles = new HashSet[String]() + def accept(path: Path): Boolean = { if (!filter.accept(path)) { return false } else { val modTime = fs.getFileStatus(path).getModificationTime() - if (modTime <= lastModTime) { + if (modTime < lastModTime){ + return false + } else if (modTime == lastModTime && lastModTimeFiles.contains(path.toString)) { return false } if (modTime > latestModTime) { latestModTime = modTime + latestModTimeFiles.clear() } + latestModTimeFiles += path.toString return true } } @@ -64,7 +81,12 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K val newFiles = fs.listStatus(path, newFilter) logInfo("New files: " + newFiles.map(_.getPath).mkString(", ")) if (newFiles.length > 0) { - lastModTime = newFilter.latestModTime + // Update the modification time and the files processed for that modification time + if (lastModTime != newFilter.latestModTime) { + lastModTime = newFilter.latestModTime + lastModTimeFiles.clear() + } + lastModTimeFiles ++= newFilter.latestModTimeFiles } val newRDD = new UnionRDD(ssc.sc, newFiles.map( file => ssc.sc.newAPIHadoopFile[K, V, F](file.getPath.toString))) diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index 8f892baab1..0957748603 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -9,12 +9,19 @@ import spark.storage.StorageLevel import spark.Logging import scala.util.Random import org.apache.commons.io.FileUtils +import org.scalatest.BeforeAndAfter -class InputStreamsSuite extends TestSuiteBase { +class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + override def checkpointDir = "checkpoint" + + after { + FileUtils.deleteDirectory(new File(checkpointDir)) + } + test("network input stream") { // Start the server val serverPort = 9999 @@ -30,7 +37,7 @@ class InputStreamsSuite extends TestSuiteBase { ssc.registerOutputStream(outputStream) ssc.start() - // Feed data to the server to send to the Spark Streaming network receiver + // Feed data to the server to send to the network receiver val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] val input = Seq(1, 2, 3, 4, 5) val expectedOutput = input.map(_.toString) @@ -52,7 +59,7 @@ class InputStreamsSuite extends TestSuiteBase { logInfo("Stopping context") ssc.stop() - // Verify whether data received by Spark Streaming was as expected + // Verify whether data received was as expected logInfo("--------------------------------") logInfo("output.size = " + outputBuffer.size) logInfo("output") @@ -69,6 +76,49 @@ class InputStreamsSuite extends TestSuiteBase { } } + test("network input stream with checkpoint") { + // Start the server + val serverPort = 9999 + val server = new TestServer(9999) + server.start() + + // Set up the streaming context and input streams + var ssc = new StreamingContext(master, framework) + ssc.setBatchDuration(batchDuration) + ssc.checkpoint(checkpointDir, checkpointInterval) + val networkStream = ssc.networkTextStream("localhost", serverPort, StorageLevel.MEMORY_AND_DISK) + var outputStream = new TestOutputStream(networkStream, new ArrayBuffer[Seq[String]]) + ssc.registerOutputStream(outputStream) + ssc.start() + + // Feed data to the server to send to the network receiver + var clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + for (i <- Seq(1, 2, 3)) { + server.send(i.toString + "\n") + Thread.sleep(100) + clock.addToTime(batchDuration.milliseconds) + } + Thread.sleep(500) + assert(outputStream.output.size > 0) + ssc.stop() + + // Restart stream computation from checkpoint and feed more data to see whether + // they are being received and processed + logInfo("*********** RESTARTING ************") + ssc = new StreamingContext(checkpointDir) + ssc.start() + clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + for (i <- Seq(4, 5, 6)) { + server.send(i.toString + "\n") + Thread.sleep(100) + clock.addToTime(batchDuration.milliseconds) + } + Thread.sleep(500) + outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[String]] + assert(outputStream.output.size > 0) + ssc.stop() + } + test("file input stream") { // Create a temporary directory val dir = { @@ -76,7 +126,7 @@ class InputStreamsSuite extends TestSuiteBase { temp.delete() temp.mkdirs() temp.deleteOnExit() - println("Created temp dir " + temp) + logInfo("Created temp dir " + temp) temp } @@ -84,7 +134,9 @@ class InputStreamsSuite extends TestSuiteBase { val ssc = new StreamingContext(master, framework) ssc.setBatchDuration(batchDuration) val filestream = ssc.textFileStream(dir.toString) - val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]] + val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] + def output = outputBuffer.flatMap(x => x) + val outputStream = new TestOutputStream(filestream, outputBuffer) ssc.registerOutputStream(outputStream) ssc.start() @@ -96,36 +148,88 @@ class InputStreamsSuite extends TestSuiteBase { Thread.sleep(1000) for (i <- 0 until input.size) { FileUtils.writeStringToFile(new File(dir, i.toString), input(i).toString + "\n") - Thread.sleep(500) + Thread.sleep(100) clock.addToTime(batchDuration.milliseconds) - Thread.sleep(500) + Thread.sleep(100) } val startTime = System.currentTimeMillis() - while (outputBuffer.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { - println("output.size = " + outputBuffer.size + ", expectedOutput.size = " + expectedOutput.size) + while (output.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { + //println("output.size = " + output.size + ", expectedOutput.size = " + expectedOutput.size) Thread.sleep(100) } Thread.sleep(1000) val timeTaken = System.currentTimeMillis() - startTime assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") - println("Stopping context") + logInfo("Stopping context") ssc.stop() // Verify whether data received by Spark Streaming was as expected logInfo("--------------------------------") - logInfo("output.size = " + outputBuffer.size) + logInfo("output.size = " + output.size) logInfo("output") - outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) + output.foreach(x => logInfo("[" + x.mkString(",") + "]")) logInfo("expected output.size = " + expectedOutput.size) logInfo("expected output") expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) logInfo("--------------------------------") - assert(outputBuffer.size === expectedOutput.size) - for (i <- 0 until outputBuffer.size) { - assert(outputBuffer(i).size === 1) - assert(outputBuffer(i).head === expectedOutput(i)) + assert(output.size === expectedOutput.size) + for (i <- 0 until output.size) { + assert(output(i).size === 1) + assert(output(i).head.toString === expectedOutput(i)) + } + } + + test("file input stream with checkpoint") { + // Create a temporary directory + val dir = { + var temp = File.createTempFile(".temp.", Random.nextInt().toString) + temp.delete() + temp.mkdirs() + temp.deleteOnExit() + println("Created temp dir " + temp) + temp } + + // Set up the streaming context and input streams + var ssc = new StreamingContext(master, framework) + ssc.setBatchDuration(batchDuration) + ssc.checkpoint(checkpointDir, checkpointInterval) + val filestream = ssc.textFileStream(dir.toString) + var outputStream = new TestOutputStream(filestream, new ArrayBuffer[Seq[String]]) + ssc.registerOutputStream(outputStream) + ssc.start() + + // Create files and advance manual clock to process them + var clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + Thread.sleep(1000) + for (i <- Seq(1, 2, 3)) { + FileUtils.writeStringToFile(new File(dir, i.toString), i.toString + "\n") + Thread.sleep(100) + clock.addToTime(batchDuration.milliseconds) + } + Thread.sleep(500) + logInfo("Output = " + outputStream.output.mkString(",")) + assert(outputStream.output.size > 0) + ssc.stop() + + // Restart stream computation from checkpoint and create more files to see whether + // they are being processed + logInfo("*********** RESTARTING ************") + ssc = new StreamingContext(checkpointDir) + ssc.start() + clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + Thread.sleep(500) + for (i <- Seq(4, 5, 6)) { + FileUtils.writeStringToFile(new File(dir, i.toString), i.toString + "\n") + Thread.sleep(100) + clock.addToTime(batchDuration.milliseconds) + } + Thread.sleep(500) + outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[String]] + logInfo("Output = " + outputStream.output.mkString(",")) + assert(outputStream.output.size > 0) + ssc.stop() } } |