aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-10-01 15:21:56 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-10-01 15:21:56 -0700
commitbc881e479884b3cd19720b9fdc6ac24baa004c90 (patch)
treef7fa82a3e8890eb805e4b424f8eb3ade0d06ccad /core/src/main
parent802aa8aef90fe7d2f0c859c68f12361db286bf20 (diff)
parent8981804c71e329a90541e69cd0fd3eb6148c8b27 (diff)
downloadspark-bc881e479884b3cd19720b9fdc6ac24baa004c90.tar.gz
spark-bc881e479884b3cd19720b9fdc6ac24baa004c90.tar.bz2
spark-bc881e479884b3cd19720b9fdc6ac24baa004c90.zip
Merge branch 'dev' of github.com:mesos/spark into dev
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala62
1 files changed, 32 insertions, 30 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 8be2d08cfc..37d5862575 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -49,7 +49,7 @@ extends Exception(message)
class BlockLocker(numLockers: Int) {
private val hashLocker = Array.fill(numLockers)(new Object())
-
+
def getLock(blockId: String): Object = {
return hashLocker(math.abs(blockId.hashCode % numLockers))
}
@@ -68,13 +68,13 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
private[storage] val diskStore: BlockStore = new DiskStore(this,
System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
-
+
val connectionManager = new ConnectionManager(0)
implicit val futureExecContext = connectionManager.futureExecContext
-
+
val connectionManagerId = connectionManager.id
val blockManagerId = new BlockManagerId(connectionManagerId.host, connectionManagerId.port)
-
+
// TODO: This will be removed after cacheTracker is removed from the code base.
var cacheTracker: CacheTracker = null
@@ -121,7 +121,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
if (level == null) {
throw new IllegalArgumentException("Storage level is null")
}
-
+
// If there was earlier info about the block, then use earlier tellMaster
val oldInfo = blockInfo.get(blockId)
val newTellMaster = if (oldInfo != null) oldInfo.tellMaster else tellMaster
@@ -132,12 +132,12 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
// If level is valid, store the block info, else remove the block info
if (level.isValid) {
blockInfo.put(blockId, new BlockInfo(level, newTellMaster))
- logDebug("Info for block " + blockId + " updated with new level as " + level)
+ logDebug("Info for block " + blockId + " updated with new level as " + level)
} else {
blockInfo.remove(blockId)
- logDebug("Info for block " + blockId + " removed as new level is null or invalid")
+ logDebug("Info for block " + blockId + " removed as new level is null or invalid")
}
-
+
// Tell master if necessary
if (newTellMaster) {
master.mustHeartBeat(HeartBeat(
@@ -180,11 +180,11 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
def getLocal(blockId: String): Option[Iterator[Any]] = {
logDebug("Getting local block " + blockId)
locker.getLock(blockId).synchronized {
- // Check storage level of block
+ // Check storage level of block
val level = getLevel(blockId)
if (level != null) {
logDebug("Level for block " + blockId + " is " + level + " on local machine")
-
+
// Look for the block in memory
if (level.useMemory) {
logDebug("Getting block " + blockId + " from memory")
@@ -216,8 +216,8 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
} else {
logDebug("Block " + blockId + " not registered locally")
}
- }
- return None
+ }
+ return None
}
/**
@@ -431,7 +431,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
throw new IllegalArgumentException("Storage level is null or invalid")
}
- val startTimeMs = System.currentTimeMillis
+ val startTimeMs = System.currentTimeMillis
var bytes: ByteBuffer = null
// If we need to replicate the data, we'll want access to the values, but because our
@@ -439,21 +439,21 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
// the put serializes data, we'll remember the bytes, above; but for the case where
// it doesn't, such as MEMORY_ONLY_DESER, let's rely on the put returning an Iterator.
var valuesAfterPut: Iterator[Any] = null
-
+
locker.getLock(blockId).synchronized {
logDebug("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
+ " to get into synchronized block")
-
- // Check and warn if block with same id already exists
+
+ // Check and warn if block with same id already exists
if (getLevel(blockId) != null) {
logWarning("Block " + blockId + " already exists in local machine")
return
}
if (level.useMemory && level.useDisk) {
- // If saving to both memory and disk, then serialize only once
+ // If saving to both memory and disk, then serialize only once
memoryStore.putValues(blockId, values, level, true) match {
- case Left(newValues) =>
+ case Left(newValues) =>
diskStore.putValues(blockId, newValues, level, true) match {
case Right(newBytes) => bytes = newBytes
case _ => throw new Exception("Unexpected return value")
@@ -463,7 +463,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
diskStore.putBytes(blockId, newBytes, level)
}
} else if (level.useMemory) {
- // If only save to memory
+ // If only save to memory
memoryStore.putValues(blockId, values, level, true) match {
case Right(newBytes) => bytes = newBytes
case Left(newIterator) => valuesAfterPut = newIterator
@@ -482,7 +482,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs))
- // Replicate block if required
+ // Replicate block if required
if (level.replication > 1) {
// Serialize the block if not already done
if (bytes == null) {
@@ -492,7 +492,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
bytes = dataSerialize(valuesAfterPut)
}
- replicate(blockId, bytes, level)
+ replicate(blockId, bytes, level)
}
BlockManager.dispose(bytes)
@@ -520,10 +520,10 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
if (level == null || !level.isValid) {
throw new IllegalArgumentException("Storage level is null or invalid")
}
-
- val startTimeMs = System.currentTimeMillis
-
- // Initiate the replication before storing it locally. This is faster as
+
+ val startTimeMs = System.currentTimeMillis
+
+ // Initiate the replication before storing it locally. This is faster as
// data is already serialized and ready for sending
val replicationFuture = if (level.replication > 1) {
val bufferView = bytes.duplicate() // Doesn't copy the bytes, just creates a wrapper
@@ -559,7 +559,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
if (blockId.startsWith("rdd")) {
notifyTheCacheTracker(blockId)
}
-
+
// If replication had started, then wait for it to finish
if (level.replication > 1) {
if (replicationFuture == null) {
@@ -569,10 +569,10 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
if (level.replication > 1) {
- logDebug("PutBytes for block " + blockId + " with replication took " +
+ logDebug("PutBytes for block " + blockId + " with replication took " +
Utils.getUsedTimeMs(startTimeMs))
} else {
- logDebug("PutBytes for block " + blockId + " without replication took " +
+ logDebug("PutBytes for block " + blockId + " without replication took " +
Utils.getUsedTimeMs(startTimeMs))
}
}
@@ -586,7 +586,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
new StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1)
if (cachedPeers == null) {
cachedPeers = master.mustGetPeers(GetPeers(blockManagerId, level.replication - 1))
- }
+ }
for (peer: BlockManagerId <- cachedPeers) {
val start = System.nanoTime
data.rewind()
@@ -706,7 +706,9 @@ object BlockManager extends Logging {
def dispose(buffer: ByteBuffer) {
if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) {
logDebug("Unmapping " + buffer)
- buffer.asInstanceOf[DirectBuffer].cleaner().clean()
+ if (buffer.asInstanceOf[DirectBuffer].cleaner() != null) {
+ buffer.asInstanceOf[DirectBuffer].cleaner().clean()
+ }
}
}
}