aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorCharles Reiss <charles@eecs.berkeley.edu>2012-12-10 00:10:57 -0800
committerCharles Reiss <charles@eecs.berkeley.edu>2012-12-10 00:31:50 -0800
commit5d3e917d09241c783a0e826caae9b85cf5b044bf (patch)
tree5408191e74a3aee440cfc7acdc25635785e1df48 /core
parentb53dd28c908580bf84f798eb39cf4449d6dab216 (diff)
downloadspark-5d3e917d09241c783a0e826caae9b85cf5b044bf.tar.gz
spark-5d3e917d09241c783a0e826caae9b85cf5b044bf.tar.bz2
spark-5d3e917d09241c783a0e826caae9b85cf5b044bf.zip
Use Akka scheduler for BlockManager heart beats.
Adds required ActorSystem argument to BlockManager constructors.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala2
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala36
-rw-r--r--core/src/main/scala/spark/storage/ThreadingTest.scala2
-rw-r--r--core/src/test/scala/spark/storage/BlockManagerSuite.scala50
4 files changed, 41 insertions, 49 deletions
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 9f2b0c42c7..272d7cdad3 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -88,7 +88,7 @@ object SparkEnv extends Logging {
val serializer = instantiateClass[Serializer]("spark.serializer", "spark.JavaSerializer")
val blockManagerMaster = new BlockManagerMaster(actorSystem, isMaster, isLocal)
- val blockManager = new BlockManager(blockManagerMaster, serializer)
+ val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer)
val connectionManager = blockManager.connectionManager
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index bb6fc34f5d..4e7d11996f 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -1,7 +1,9 @@
package spark.storage
+import akka.actor.{ActorSystem, Cancellable}
import akka.dispatch.{Await, Future}
import akka.util.Duration
+import akka.util.duration._
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
@@ -12,7 +14,7 @@ import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
import scala.collection.JavaConversions._
-import spark.{CacheTracker, Logging, SizeEstimator, SparkException, Utils}
+import spark.{CacheTracker, Logging, SizeEstimator, SparkEnv, SparkException, Utils}
import spark.network._
import spark.serializer.Serializer
import spark.util.ByteBufferInputStream
@@ -45,13 +47,13 @@ private[spark] class BlockManagerId(var ip: String, var port: Int) extends Exter
}
}
-
private[spark]
case class BlockException(blockId: String, message: String, ex: Exception = null)
extends Exception(message)
private[spark]
-class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long)
+class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
+ val serializer: Serializer, maxMemory: Long)
extends Logging {
class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
@@ -116,28 +118,15 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
- val heartBeatThread = new Thread("BlockManager heartbeat") {
- setDaemon(true)
-
- override def run: Unit = {
- while (!shuttingDown) {
- heartBeat()
- try {
- Thread.sleep(heartBeatFrequency)
- } catch {
- case e: InterruptedException => {}
- }
- }
- }
- }
+ var heartBeatTask: Cancellable = null
initialize()
/**
* Construct a BlockManager with a memory limit set based on system properties.
*/
- def this(master: BlockManagerMaster, serializer: Serializer) = {
- this(master, serializer, BlockManager.getMaxMemoryFromSystemProperties)
+ def this(actorSystem: ActorSystem, master: BlockManagerMaster, serializer: Serializer) = {
+ this(actorSystem, master, serializer, BlockManager.getMaxMemoryFromSystemProperties)
}
/**
@@ -149,7 +138,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
RegisterBlockManager(blockManagerId, maxMemory))
BlockManagerWorker.startBlockManagerWorker(this)
if (!BlockManager.getDisableHeartBeatsForTesting) {
- heartBeatThread.start()
+ heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) {
+ heartBeat()
+ }
}
}
@@ -914,8 +905,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
def stop() {
- shuttingDown = true
- heartBeatThread.interrupt()
+ if (heartBeatTask != null) {
+ heartBeatTask.cancel()
+ }
connectionManager.stop()
blockInfo.clear()
memoryStore.clear()
diff --git a/core/src/main/scala/spark/storage/ThreadingTest.scala b/core/src/main/scala/spark/storage/ThreadingTest.scala
index e4a5b8ffdf..5bb5a29cc4 100644
--- a/core/src/main/scala/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/spark/storage/ThreadingTest.scala
@@ -74,7 +74,7 @@ private[spark] object ThreadingTest {
val actorSystem = ActorSystem("test")
val serializer = new KryoSerializer
val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true)
- val blockManager = new BlockManager(blockManagerMaster, serializer, 1024 * 1024)
+ val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer, 1024 * 1024)
val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))
val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue))
producers.foreach(_.start)
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index 1491818140..ad2253596d 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -64,7 +64,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("manager-master interaction") {
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -95,7 +95,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
test("reregistration on heart beat") {
val heartBeat = PrivateMethod[Unit]('heartBeat)
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
val a1 = new Array[Byte](400)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
@@ -112,7 +112,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("reregistration on block update") {
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
@@ -133,14 +133,14 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
test("deregistration on duplicate") {
val heartBeat = PrivateMethod[Unit]('heartBeat)
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
val a1 = new Array[Byte](400)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
- store2 = new BlockManager(master, serializer, 2000)
+ store2 = new BlockManager(actorSystem, master, serializer, 2000)
assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master")
@@ -154,7 +154,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU storage") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -173,7 +173,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU storage with serialization") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -192,7 +192,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU for partitions of same RDD") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -211,7 +211,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU for partitions of multiple RDDs") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
store.putSingle("rdd_0_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
store.putSingle("rdd_0_2", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
store.putSingle("rdd_1_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
@@ -234,7 +234,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("on-disk storage") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -247,7 +247,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -262,7 +262,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with getLocalBytes") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -277,7 +277,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with serialization") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -292,7 +292,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with serialization and getLocalBytes") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -307,7 +307,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("LRU with mixed storage levels") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -332,7 +332,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU with streams") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
@@ -356,7 +356,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("LRU with mixed storage levels and streams") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
@@ -402,7 +402,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("overly large block") {
- store = new BlockManager(master, serializer, 500)
+ store = new BlockManager(actorSystem, master, serializer, 500)
store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
assert(store.getSingle("a1") === None, "a1 was in store")
store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK)
@@ -413,49 +413,49 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
test("block compression") {
try {
System.setProperty("spark.shuffle.compress", "true")
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("shuffle_0_0_0") <= 100, "shuffle_0_0_0 was not compressed")
store.stop()
store = null
System.setProperty("spark.shuffle.compress", "false")
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("shuffle_0_0_0") >= 1000, "shuffle_0_0_0 was compressed")
store.stop()
store = null
System.setProperty("spark.broadcast.compress", "true")
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("broadcast_0") <= 100, "broadcast_0 was not compressed")
store.stop()
store = null
System.setProperty("spark.broadcast.compress", "false")
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("broadcast_0") >= 1000, "broadcast_0 was compressed")
store.stop()
store = null
System.setProperty("spark.rdd.compress", "true")
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("rdd_0_0") <= 100, "rdd_0_0 was not compressed")
store.stop()
store = null
System.setProperty("spark.rdd.compress", "false")
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("rdd_0_0") >= 1000, "rdd_0_0 was compressed")
store.stop()
store = null
// Check that any other block types are also kept uncompressed
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("other_block", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
assert(store.memoryStore.getSize("other_block") >= 1000, "other_block was compressed")
store.stop()