From 68c52d80ecd5dd173f755bedc813fdc1a52100aa Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 19 Dec 2012 15:27:23 -0800 Subject: Moved BlockManager's IdGenerator into BlockManager object. Removed some excessive debug messages. --- core/src/main/scala/spark/storage/BlockManager.scala | 9 ++++++--- .../main/scala/spark/storage/BlockManagerMaster.scala | 4 ++-- .../scala/spark/storage/BlockManagerMasterActor.scala | 12 ------------ core/src/main/scala/spark/util/GenerationIdUtil.scala | 19 ------------------- core/src/main/scala/spark/util/IdGenerator.scala | 14 ++++++++++++++ 5 files changed, 22 insertions(+), 36 deletions(-) delete mode 100644 core/src/main/scala/spark/util/GenerationIdUtil.scala create mode 100644 core/src/main/scala/spark/util/IdGenerator.scala diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index eedf6d96e2..682ea7baff 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -19,7 +19,7 @@ import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream import spark.{CacheTracker, Logging, SizeEstimator, SparkEnv, SparkException, Utils} import spark.network._ import spark.serializer.Serializer -import spark.util.{ByteBufferInputStream, GenerationIdUtil, MetadataCleaner, TimeStampedHashMap} +import spark.util.{ByteBufferInputStream, IdGenerator, MetadataCleaner, TimeStampedHashMap} import sun.nio.ch.DirectBuffer @@ -91,7 +91,7 @@ class BlockManager( val host = System.getProperty("spark.hostname", Utils.localHostName()) val slaveActor = master.actorSystem.actorOf(Props(new BlockManagerSlaveActor(this)), - name = "BlockManagerActor" + GenerationIdUtil.BLOCK_MANAGER.next) + name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next) @volatile private var shuttingDown = false @@ -865,7 +865,7 @@ class BlockManager( blockInfo.remove(blockId) } else { // The block has already been removed; do nothing. - logWarning("Block " + blockId + " does not exist.") + logWarning("Asked to remove block " + blockId + ", which does not exist") } } @@ -951,6 +951,9 @@ class BlockManager( private[spark] object BlockManager extends Logging { + + val ID_GENERATOR = new IdGenerator + def getMaxMemoryFromSystemProperties: Long = { val memoryFraction = System.getProperty("spark.storage.memoryFraction", "0.66").toDouble (Runtime.getRuntime.maxMemory * memoryFraction).toLong diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index e8a1e5889f..cb582633c4 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -20,8 +20,8 @@ private[spark] class BlockManagerMaster( masterPort: Int) extends Logging { - val AKKA_RETRY_ATTEMPS: Int = System.getProperty("spark.akka.num.retries", "5").toInt - val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "100").toInt + val AKKA_RETRY_ATTEMPS: Int = System.getProperty("spark.akka.num.retries", "3").toInt + val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt val MASTER_AKKA_ACTOR_NAME = "BlockMasterManager" val SLAVE_AKKA_ACTOR_NAME = "BlockSlaveManager" diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala index e3de8d8e4e..0a1be98d83 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala @@ -183,7 +183,6 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { private def register(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) { val startTimeMs = System.currentTimeMillis() val tmp = " " + blockManagerId + " " - logDebug("Got in register 0" + tmp + Utils.getUsedTimeMs(startTimeMs)) if (blockManagerId.ip == Utils.localHostName() && !isLocal) { logInfo("Got Register Msg from master node, don't register it") @@ -200,7 +199,6 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { blockManagerInfo += (blockManagerId -> new BlockManagerMasterActor.BlockManagerInfo( blockManagerId, System.currentTimeMillis(), maxMemSize, slaveActor)) } - logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs)) sender ! true } @@ -227,7 +225,6 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { if (blockId == null) { blockManagerInfo(blockManagerId).updateLastSeenMs() - logDebug("Got in block update 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs)) sender ! true return } @@ -257,15 +254,11 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { private def getLocations(blockId: String) { val startTimeMs = System.currentTimeMillis() val tmp = " " + blockId + " " - logDebug("Got in getLocations 0" + tmp + Utils.getUsedTimeMs(startTimeMs)) if (blockInfo.containsKey(blockId)) { var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId] res.appendAll(blockInfo.get(blockId)._2) - logDebug("Got in getLocations 1" + tmp + " as "+ res.toSeq + " at " - + Utils.getUsedTimeMs(startTimeMs)) sender ! res.toSeq } else { - logDebug("Got in getLocations 2" + tmp + Utils.getUsedTimeMs(startTimeMs)) var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId] sender ! res } @@ -274,25 +267,20 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { private def getLocationsMultipleBlockIds(blockIds: Array[String]) { def getLocations(blockId: String): Seq[BlockManagerId] = { val tmp = blockId - logDebug("Got in getLocationsMultipleBlockIds Sub 0 " + tmp) if (blockInfo.containsKey(blockId)) { var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId] res.appendAll(blockInfo.get(blockId)._2) - logDebug("Got in getLocationsMultipleBlockIds Sub 1 " + tmp + " " + res.toSeq) return res.toSeq } else { - logDebug("Got in getLocationsMultipleBlockIds Sub 2 " + tmp) var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId] return res.toSeq } } - logDebug("Got in getLocationsMultipleBlockIds " + blockIds.toSeq) var res: ArrayBuffer[Seq[BlockManagerId]] = new ArrayBuffer[Seq[BlockManagerId]] for (blockId <- blockIds) { res.append(getLocations(blockId)) } - logDebug("Got in getLocationsMultipleBlockIds " + blockIds.toSeq + " : " + res.toSeq) sender ! res.toSeq } diff --git a/core/src/main/scala/spark/util/GenerationIdUtil.scala b/core/src/main/scala/spark/util/GenerationIdUtil.scala deleted file mode 100644 index 8a17b700b0..0000000000 --- a/core/src/main/scala/spark/util/GenerationIdUtil.scala +++ /dev/null @@ -1,19 +0,0 @@ -package spark.util - -import java.util.concurrent.atomic.AtomicInteger - -private[spark] -object GenerationIdUtil { - - val BLOCK_MANAGER = new IdGenerator - - /** - * A util used to get a unique generation ID. This is a wrapper around - * Java's AtomicInteger. - */ - class IdGenerator { - private var id = new AtomicInteger - - def next: Int = id.incrementAndGet - } -} diff --git a/core/src/main/scala/spark/util/IdGenerator.scala b/core/src/main/scala/spark/util/IdGenerator.scala new file mode 100644 index 0000000000..b6e309fe1a --- /dev/null +++ b/core/src/main/scala/spark/util/IdGenerator.scala @@ -0,0 +1,14 @@ +package spark.util + +import java.util.concurrent.atomic.AtomicInteger + +/** + * A util used to get a unique generation ID. This is a wrapper around Java's + * AtomicInteger. An example usage is in BlockManager, where each BlockManager + * instance would start an Akka actor and we use this utility to assign the Akka + * actors unique names. + */ +private[spark] class IdGenerator { + private var id = new AtomicInteger + def next: Int = id.incrementAndGet +} -- cgit v1.2.3