diff options
Diffstat (limited to 'core/src/main')
-rw-r--r-- | core/src/main/scala/spark/storage/BlockManager.scala | 62 |
1 files changed, 32 insertions, 30 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index e7dea904c3..ffaf51b34d 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -49,7 +49,7 @@ extends Exception(message) class BlockLocker(numLockers: Int) { private val hashLocker = Array.fill(numLockers)(new Object()) - + def getLock(blockId: String): Object = { return hashLocker(math.abs(blockId.hashCode % numLockers)) } @@ -68,13 +68,13 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory) private[storage] val diskStore: BlockStore = new DiskStore(this, System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir"))) - + val connectionManager = new ConnectionManager(0) implicit val futureExecContext = connectionManager.futureExecContext - + val connectionManagerId = connectionManager.id val blockManagerId = new BlockManagerId(connectionManagerId.host, connectionManagerId.port) - + // TODO: This will be removed after cacheTracker is removed from the code base. var cacheTracker: CacheTracker = null @@ -123,7 +123,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m if (level == null) { throw new IllegalArgumentException("Storage level is null") } - + // If there was earlier info about the block, then use earlier tellMaster val oldInfo = blockInfo.get(blockId) val newTellMaster = if (oldInfo != null) oldInfo.tellMaster else tellMaster @@ -134,12 +134,12 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m // If level is valid, store the block info, else remove the block info if (level.isValid) { blockInfo.put(blockId, new BlockInfo(level, newTellMaster)) - logDebug("Info for block " + blockId + " updated with new level as " + level) + logDebug("Info for block " + blockId + " updated with new level as " + level) } else { blockInfo.remove(blockId) - logDebug("Info for block " + blockId + " removed as new level is null or invalid") + logDebug("Info for block " + blockId + " removed as new level is null or invalid") } - + // Tell master if necessary if (newTellMaster) { master.mustHeartBeat(HeartBeat( @@ -182,11 +182,11 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m def getLocal(blockId: String): Option[Iterator[Any]] = { logDebug("Getting local block " + blockId) locker.getLock(blockId).synchronized { - // Check storage level of block + // Check storage level of block val level = getLevel(blockId) if (level != null) { logDebug("Level for block " + blockId + " is " + level + " on local machine") - + // Look for the block in memory if (level.useMemory) { logDebug("Getting block " + blockId + " from memory") @@ -218,8 +218,8 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } else { logDebug("Block " + blockId + " not registered locally") } - } - return None + } + return None } /** @@ -433,7 +433,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m throw new IllegalArgumentException("Storage level is null or invalid") } - val startTimeMs = System.currentTimeMillis + val startTimeMs = System.currentTimeMillis var bytes: ByteBuffer = null // If we need to replicate the data, we'll want access to the values, but because our @@ -441,21 +441,21 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m // the put serializes data, we'll remember the bytes, above; but for the case where // it doesn't, such as MEMORY_ONLY_DESER, let's rely on the put returning an Iterator. var valuesAfterPut: Iterator[Any] = null - + locker.getLock(blockId).synchronized { logDebug("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs) + " to get into synchronized block") - - // Check and warn if block with same id already exists + + // Check and warn if block with same id already exists if (getLevel(blockId) != null) { logWarning("Block " + blockId + " already exists in local machine") return } if (level.useMemory && level.useDisk) { - // If saving to both memory and disk, then serialize only once + // If saving to both memory and disk, then serialize only once memoryStore.putValues(blockId, values, level, true) match { - case Left(newValues) => + case Left(newValues) => diskStore.putValues(blockId, newValues, level, true) match { case Right(newBytes) => bytes = newBytes case _ => throw new Exception("Unexpected return value") @@ -465,7 +465,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m diskStore.putBytes(blockId, newBytes, level) } } else if (level.useMemory) { - // If only save to memory + // If only save to memory memoryStore.putValues(blockId, values, level, true) match { case Right(newBytes) => bytes = newBytes case Left(newIterator) => valuesAfterPut = newIterator @@ -484,7 +484,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs)) - // Replicate block if required + // Replicate block if required if (level.replication > 1) { // Serialize the block if not already done if (bytes == null) { @@ -494,7 +494,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } bytes = dataSerialize(valuesAfterPut) } - replicate(blockId, bytes, level) + replicate(blockId, bytes, level) } BlockManager.dispose(bytes) @@ -522,10 +522,10 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m if (level == null || !level.isValid) { throw new IllegalArgumentException("Storage level is null or invalid") } - - val startTimeMs = System.currentTimeMillis - - // Initiate the replication before storing it locally. This is faster as + + val startTimeMs = System.currentTimeMillis + + // Initiate the replication before storing it locally. This is faster as // data is already serialized and ready for sending val replicationFuture = if (level.replication > 1) { val bufferView = bytes.duplicate() // Doesn't copy the bytes, just creates a wrapper @@ -561,7 +561,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m if (blockId.startsWith("rdd")) { notifyTheCacheTracker(blockId) } - + // If replication had started, then wait for it to finish if (level.replication > 1) { if (replicationFuture == null) { @@ -571,10 +571,10 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } if (level.replication > 1) { - logDebug("PutBytes for block " + blockId + " with replication took " + + logDebug("PutBytes for block " + blockId + " with replication took " + Utils.getUsedTimeMs(startTimeMs)) } else { - logDebug("PutBytes for block " + blockId + " without replication took " + + logDebug("PutBytes for block " + blockId + " without replication took " + Utils.getUsedTimeMs(startTimeMs)) } } @@ -588,7 +588,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m new StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1) if (cachedPeers == null) { cachedPeers = master.mustGetPeers(GetPeers(blockManagerId, level.replication - 1)) - } + } for (peer: BlockManagerId <- cachedPeers) { val start = System.nanoTime data.rewind() @@ -708,7 +708,9 @@ object BlockManager extends Logging { def dispose(buffer: ByteBuffer) { if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) { logDebug("Unmapping " + buffer) - buffer.asInstanceOf[DirectBuffer].cleaner().clean() + if (buffer.asInstanceOf[DirectBuffer].cleaner() != null) { + buffer.asInstanceOf[DirectBuffer].cleaner().clean() + } } } } |