aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/storage/BlockManager.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/storage/BlockManager.scala')
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala36
1 files changed, 14 insertions, 22 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index bb6fc34f5d..4e7d11996f 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -1,7 +1,9 @@
package spark.storage
+import akka.actor.{ActorSystem, Cancellable}
import akka.dispatch.{Await, Future}
import akka.util.Duration
+import akka.util.duration._
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
@@ -12,7 +14,7 @@ import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
import scala.collection.JavaConversions._
-import spark.{CacheTracker, Logging, SizeEstimator, SparkException, Utils}
+import spark.{CacheTracker, Logging, SizeEstimator, SparkEnv, SparkException, Utils}
import spark.network._
import spark.serializer.Serializer
import spark.util.ByteBufferInputStream
@@ -45,13 +47,13 @@ private[spark] class BlockManagerId(var ip: String, var port: Int) extends Exter
}
}
-
private[spark]
case class BlockException(blockId: String, message: String, ex: Exception = null)
extends Exception(message)
private[spark]
-class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long)
+class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
+ val serializer: Serializer, maxMemory: Long)
extends Logging {
class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
@@ -116,28 +118,15 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
- val heartBeatThread = new Thread("BlockManager heartbeat") {
- setDaemon(true)
-
- override def run: Unit = {
- while (!shuttingDown) {
- heartBeat()
- try {
- Thread.sleep(heartBeatFrequency)
- } catch {
- case e: InterruptedException => {}
- }
- }
- }
- }
+ var heartBeatTask: Cancellable = null
initialize()
/**
* Construct a BlockManager with a memory limit set based on system properties.
*/
- def this(master: BlockManagerMaster, serializer: Serializer) = {
- this(master, serializer, BlockManager.getMaxMemoryFromSystemProperties)
+ def this(actorSystem: ActorSystem, master: BlockManagerMaster, serializer: Serializer) = {
+ this(actorSystem, master, serializer, BlockManager.getMaxMemoryFromSystemProperties)
}
/**
@@ -149,7 +138,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
RegisterBlockManager(blockManagerId, maxMemory))
BlockManagerWorker.startBlockManagerWorker(this)
if (!BlockManager.getDisableHeartBeatsForTesting) {
- heartBeatThread.start()
+ heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) {
+ heartBeat()
+ }
}
}
@@ -914,8 +905,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
def stop() {
- shuttingDown = true
- heartBeatThread.interrupt()
+ if (heartBeatTask != null) {
+ heartBeatTask.cancel()
+ }
connectionManager.stop()
blockInfo.clear()
memoryStore.clear()