aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2012-12-19 15:27:23 -0800
committerReynold Xin <rxin@cs.berkeley.edu>2012-12-19 15:27:23 -0800
commit68c52d80ecd5dd173f755bedc813fdc1a52100aa (patch)
treefc42ab8fdada6b0bad17df3d75fca20439d955ce
parent06f855c24d5e4cb77a69671a3b0e2afba7d4e1c0 (diff)
downloadspark-68c52d80ecd5dd173f755bedc813fdc1a52100aa.tar.gz
spark-68c52d80ecd5dd173f755bedc813fdc1a52100aa.tar.bz2
spark-68c52d80ecd5dd173f755bedc813fdc1a52100aa.zip
Moved BlockManager's IdGenerator into BlockManager object. Removed some
excessive debug messages.
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala9
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala4
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMasterActor.scala12
-rw-r--r--core/src/main/scala/spark/util/GenerationIdUtil.scala19
-rw-r--r--core/src/main/scala/spark/util/IdGenerator.scala14
5 files changed, 22 insertions, 36 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index eedf6d96e2..682ea7baff 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -19,7 +19,7 @@ import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
import spark.{CacheTracker, Logging, SizeEstimator, SparkEnv, SparkException, Utils}
import spark.network._
import spark.serializer.Serializer
-import spark.util.{ByteBufferInputStream, GenerationIdUtil, MetadataCleaner, TimeStampedHashMap}
+import spark.util.{ByteBufferInputStream, IdGenerator, MetadataCleaner, TimeStampedHashMap}
import sun.nio.ch.DirectBuffer
@@ -91,7 +91,7 @@ class BlockManager(
val host = System.getProperty("spark.hostname", Utils.localHostName())
val slaveActor = master.actorSystem.actorOf(Props(new BlockManagerSlaveActor(this)),
- name = "BlockManagerActor" + GenerationIdUtil.BLOCK_MANAGER.next)
+ name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next)
@volatile private var shuttingDown = false
@@ -865,7 +865,7 @@ class BlockManager(
blockInfo.remove(blockId)
} else {
// The block has already been removed; do nothing.
- logWarning("Block " + blockId + " does not exist.")
+ logWarning("Asked to remove block " + blockId + ", which does not exist")
}
}
@@ -951,6 +951,9 @@ class BlockManager(
private[spark]
object BlockManager extends Logging {
+
+ val ID_GENERATOR = new IdGenerator
+
def getMaxMemoryFromSystemProperties: Long = {
val memoryFraction = System.getProperty("spark.storage.memoryFraction", "0.66").toDouble
(Runtime.getRuntime.maxMemory * memoryFraction).toLong
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index e8a1e5889f..cb582633c4 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -20,8 +20,8 @@ private[spark] class BlockManagerMaster(
masterPort: Int)
extends Logging {
- val AKKA_RETRY_ATTEMPS: Int = System.getProperty("spark.akka.num.retries", "5").toInt
- val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "100").toInt
+ val AKKA_RETRY_ATTEMPS: Int = System.getProperty("spark.akka.num.retries", "3").toInt
+ val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt
val MASTER_AKKA_ACTOR_NAME = "BlockMasterManager"
val SLAVE_AKKA_ACTOR_NAME = "BlockSlaveManager"
diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
index e3de8d8e4e..0a1be98d83 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
@@ -183,7 +183,6 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
private def register(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockManagerId + " "
- logDebug("Got in register 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
logInfo("Got Register Msg from master node, don't register it")
@@ -200,7 +199,6 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
blockManagerInfo += (blockManagerId -> new BlockManagerMasterActor.BlockManagerInfo(
blockManagerId, System.currentTimeMillis(), maxMemSize, slaveActor))
}
- logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs))
sender ! true
}
@@ -227,7 +225,6 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
if (blockId == null) {
blockManagerInfo(blockManagerId).updateLastSeenMs()
- logDebug("Got in block update 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs))
sender ! true
return
}
@@ -257,15 +254,11 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
private def getLocations(blockId: String) {
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockId + " "
- logDebug("Got in getLocations 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
if (blockInfo.containsKey(blockId)) {
var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
res.appendAll(blockInfo.get(blockId)._2)
- logDebug("Got in getLocations 1" + tmp + " as "+ res.toSeq + " at "
- + Utils.getUsedTimeMs(startTimeMs))
sender ! res.toSeq
} else {
- logDebug("Got in getLocations 2" + tmp + Utils.getUsedTimeMs(startTimeMs))
var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
sender ! res
}
@@ -274,25 +267,20 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
private def getLocationsMultipleBlockIds(blockIds: Array[String]) {
def getLocations(blockId: String): Seq[BlockManagerId] = {
val tmp = blockId
- logDebug("Got in getLocationsMultipleBlockIds Sub 0 " + tmp)
if (blockInfo.containsKey(blockId)) {
var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
res.appendAll(blockInfo.get(blockId)._2)
- logDebug("Got in getLocationsMultipleBlockIds Sub 1 " + tmp + " " + res.toSeq)
return res.toSeq
} else {
- logDebug("Got in getLocationsMultipleBlockIds Sub 2 " + tmp)
var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
return res.toSeq
}
}
- logDebug("Got in getLocationsMultipleBlockIds " + blockIds.toSeq)
var res: ArrayBuffer[Seq[BlockManagerId]] = new ArrayBuffer[Seq[BlockManagerId]]
for (blockId <- blockIds) {
res.append(getLocations(blockId))
}
- logDebug("Got in getLocationsMultipleBlockIds " + blockIds.toSeq + " : " + res.toSeq)
sender ! res.toSeq
}
diff --git a/core/src/main/scala/spark/util/GenerationIdUtil.scala b/core/src/main/scala/spark/util/GenerationIdUtil.scala
deleted file mode 100644
index 8a17b700b0..0000000000
--- a/core/src/main/scala/spark/util/GenerationIdUtil.scala
+++ /dev/null
@@ -1,19 +0,0 @@
-package spark.util
-
-import java.util.concurrent.atomic.AtomicInteger
-
-private[spark]
-object GenerationIdUtil {
-
- val BLOCK_MANAGER = new IdGenerator
-
- /**
- * A util used to get a unique generation ID. This is a wrapper around
- * Java's AtomicInteger.
- */
- class IdGenerator {
- private var id = new AtomicInteger
-
- def next: Int = id.incrementAndGet
- }
-}
diff --git a/core/src/main/scala/spark/util/IdGenerator.scala b/core/src/main/scala/spark/util/IdGenerator.scala
new file mode 100644
index 0000000000..b6e309fe1a
--- /dev/null
+++ b/core/src/main/scala/spark/util/IdGenerator.scala
@@ -0,0 +1,14 @@
+package spark.util
+
+import java.util.concurrent.atomic.AtomicInteger
+
+/**
+ * A util used to get a unique generation ID. This is a wrapper around Java's
+ * AtomicInteger. An example usage is in BlockManager, where each BlockManager
+ * instance would start an Akka actor and we use this utility to assign the Akka
+ * actors unique names.
+ */
+private[spark] class IdGenerator {
+ private var id = new AtomicInteger
+ def next: Int = id.incrementAndGet
+}