aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala3
2 files changed, 11 insertions, 14 deletions
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 3341401c8a..8c23584d60 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -179,21 +179,21 @@ extends Logging {
initialized = false
}
- val BlockSize = System.getProperty("spark.broadcast.blockSize", "4096").toInt * 1024
+ val BLOCK_SIZE = System.getProperty("spark.broadcast.blockSize", "4096").toInt * 1024
- def blockifyObject[IN](obj: IN): TorrentInfo = {
- val byteArray = Utils.serialize[IN](obj)
+ def blockifyObject[T](obj: T): TorrentInfo = {
+ val byteArray = Utils.serialize[T](obj)
val bais = new ByteArrayInputStream(byteArray)
- var blockNum = (byteArray.length / BlockSize)
- if (byteArray.length % BlockSize != 0)
+ var blockNum = (byteArray.length / BLOCK_SIZE)
+ if (byteArray.length % BLOCK_SIZE != 0)
blockNum += 1
var retVal = new Array[TorrentBlock](blockNum)
var blockID = 0
- for (i <- 0 until (byteArray.length, BlockSize)) {
- val thisBlockSize = math.min(BlockSize, byteArray.length - i)
+ for (i <- 0 until (byteArray.length, BLOCK_SIZE)) {
+ val thisBlockSize = math.min(BLOCK_SIZE, byteArray.length - i)
var tempByteArray = new Array[Byte](thisBlockSize)
val hasRead = bais.read(tempByteArray, 0, thisBlockSize)
@@ -208,15 +208,15 @@ extends Logging {
return tInfo
}
- def unBlockifyObject[OUT](arrayOfBlocks: Array[TorrentBlock],
+ def unBlockifyObject[T](arrayOfBlocks: Array[TorrentBlock],
totalBytes: Int,
- totalBlocks: Int): OUT = {
+ totalBlocks: Int): T = {
var retByteArray = new Array[Byte](totalBytes)
for (i <- 0 until totalBlocks) {
System.arraycopy(arrayOfBlocks(i).byteArray, 0, retByteArray,
- i * BlockSize, arrayOfBlocks(i).byteArray.length)
+ i * BLOCK_SIZE, arrayOfBlocks(i).byteArray.length)
}
- Utils.deserialize[OUT](retByteArray, Thread.currentThread.getContextClassLoader)
+ Utils.deserialize[T](retByteArray, Thread.currentThread.getContextClassLoader)
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 8b2a812d20..f8cf14b503 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -227,9 +227,6 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
}
private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
- /* if (id.executorId == "<driver>" && !isLocal) {
- // Got a register message from the master node; don't register it
- } else */
if (!blockManagerInfo.contains(id)) {
blockManagerIdByExecutor.get(id.executorId) match {
case Some(manager) =>