aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-07-30 13:58:46 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-07-30 13:58:46 -0700
commit3ee2530c0c40f7670151f55c05232728a12c23e2 (patch)
tree3a45b58d66a82de02135ae28aa739089b4280e60
parent400221f851cfda884c47b9cae55f347438ae007d (diff)
parented1b0f8388ee9a1bdcd252c484bfcb292960bec4 (diff)
downloadspark-3ee2530c0c40f7670151f55c05232728a12c23e2.tar.gz
spark-3ee2530c0c40f7670151f55c05232728a12c23e2.tar.bz2
spark-3ee2530c0c40f7670151f55c05232728a12c23e2.zip
Merge branch 'block-manager-fix' into dev
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala6
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala2
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala32
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala36
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerWorker.scala6
-rw-r--r--core/src/main/scala/spark/storage/BlockMessage.scala20
-rw-r--r--core/src/main/scala/spark/storage/BlockMessageArray.scala6
-rw-r--r--core/src/main/scala/spark/storage/StorageLevel.scala23
-rw-r--r--core/src/test/scala/spark/storage/BlockManagerSuite.scala40
9 files changed, 83 insertions, 88 deletions
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 25593c596b..694db6b2a3 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -31,7 +31,7 @@ class SparkEnv (
shuffleFetcher.stop()
shuffleManager.stop()
blockManager.stop()
- BlockManagerMaster.stopBlockManagerMaster()
+ blockManager.master.stop()
actorSystem.shutdown()
actorSystem.awaitTermination()
}
@@ -66,9 +66,9 @@ object SparkEnv {
val serializerClass = System.getProperty("spark.serializer", "spark.KryoSerializer")
val serializer = Class.forName(serializerClass).newInstance().asInstanceOf[Serializer]
- BlockManagerMaster.startBlockManagerMaster(actorSystem, isMaster, isLocal)
+ val blockManagerMaster = new BlockManagerMaster(actorSystem, isMaster, isLocal)
- val blockManager = new BlockManager(serializer)
+ val blockManager = new BlockManager(blockManagerMaster, serializer)
val connectionManager = blockManager.connectionManager
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 436c16cddd..f7472971b5 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -498,7 +498,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
if (!deadHosts.contains(host)) {
logInfo("Host lost: " + host)
deadHosts += host
- BlockManagerMaster.notifyADeadHost(host)
+ env.blockManager.master.notifyADeadHost(host)
// TODO: This will be really slow if we keep accumulating shuffle map stages
for ((shuffleId, stage) <- shuffleToMapStage) {
stage.removeOutputsOnHost(host)
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 9faa0e62f2..5067601198 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -66,8 +66,8 @@ class BlockLocker(numLockers: Int) {
}
-
-class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging {
+class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long)
+ extends Logging {
case class BlockInfo(level: StorageLevel, tellMaster: Boolean)
@@ -94,15 +94,16 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
/**
* Construct a BlockManager with a memory limit set based on system properties.
*/
- def this(serializer: Serializer) =
- this(BlockManager.getMaxMemoryFromSystemProperties(), serializer)
+ def this(master: BlockManagerMaster, serializer: Serializer) = {
+ this(master, serializer, BlockManager.getMaxMemoryFromSystemProperties)
+ }
/**
* Initialize the BlockManager. Register to the BlockManagerMaster, and start the
* BlockManagerWorker actor.
*/
private def initialize() {
- BlockManagerMaster.mustRegisterBlockManager(
+ master.mustRegisterBlockManager(
RegisterBlockManager(blockManagerId, maxMemory, maxMemory))
BlockManagerWorker.startBlockManagerWorker(this)
}
@@ -154,7 +155,7 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
*/
def getLocations(blockId: String): Seq[String] = {
val startTimeMs = System.currentTimeMillis
- var managers = BlockManagerMaster.mustGetLocations(GetLocations(blockId))
+ var managers = master.mustGetLocations(GetLocations(blockId))
val locations = managers.map(_.ip)
logDebug("Get block locations in " + Utils.getUsedTimeMs(startTimeMs))
return locations
@@ -165,7 +166,7 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
*/
def getLocations(blockIds: Array[String]): Array[Seq[String]] = {
val startTimeMs = System.currentTimeMillis
- val locations = BlockManagerMaster.mustGetLocationsMultipleBlockIds(
+ val locations = master.mustGetLocationsMultipleBlockIds(
GetLocationsMultipleBlockIds(blockIds)).map(_.map(_.ip).toSeq).toArray
logDebug("Get multiple block location in " + Utils.getUsedTimeMs(startTimeMs))
return locations
@@ -235,7 +236,7 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
}
logDebug("Getting remote block " + blockId)
// Get locations of block
- val locations = BlockManagerMaster.mustGetLocations(GetLocations(blockId))
+ val locations = master.mustGetLocations(GetLocations(blockId))
// Get block from remote locations
for (loc <- locations) {
@@ -321,8 +322,8 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
if (blockMessage.getType != BlockMessage.TYPE_GOT_BLOCK) {
throw new BlockException(oneBlockId, "Unexpected message received from " + cmId)
}
- val buffer = blockMessage.getData()
- val blockId = blockMessage.getId()
+ val buffer = blockMessage.getData
+ val blockId = blockMessage.getId
val block = dataDeserialize(buffer)
blocks.update(blockId, Some(block))
logDebug("Got remote block " + blockId + " in " + Utils.getUsedTimeMs(startTime))
@@ -490,7 +491,7 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
private def replicate(blockId: String, data: ByteBuffer, level: StorageLevel) {
val tLevel: StorageLevel =
new StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1)
- var peers = BlockManagerMaster.mustGetPeers(GetPeers(blockManagerId, level.replication - 1))
+ var peers = master.mustGetPeers(GetPeers(blockManagerId, level.replication - 1))
for (peer: BlockManagerId <- peers) {
val start = System.nanoTime
logDebug("Try to replicate BlockId " + blockId + " once; The size of the data is "
@@ -564,7 +565,7 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
}
private def notifyMaster(heartBeat: HeartBeat) {
- BlockManagerMaster.mustHeartBeat(heartBeat)
+ master.mustHeartBeat(heartBeat)
}
def stop() {
@@ -576,12 +577,9 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
}
}
-
-object BlockManager extends Logging {
+object BlockManager {
def getMaxMemoryFromSystemProperties(): Long = {
val memoryFraction = System.getProperty("spark.storage.memoryFraction", "0.66").toDouble
- val bytes = (Runtime.getRuntime.totalMemory * memoryFraction).toLong
- logInfo("Maximum memory to use: " + bytes)
- bytes
+ (Runtime.getRuntime.totalMemory * memoryFraction).toLong
}
}
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index 97a5b0cb45..9f03c5a32c 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -85,7 +85,7 @@ case class RemoveHost(host: String) extends ToBlockManagerMaster
case object StopBlockManagerMaster extends ToBlockManagerMaster
-class BlockManagerMaster(val isLocal: Boolean) extends Actor with Logging {
+class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
class BlockManagerInfo(
timeMs: Long,
@@ -134,19 +134,19 @@ class BlockManagerMaster(val isLocal: Boolean) extends Actor with Logging {
}
}
- def getLastSeenMs(): Long = {
+ def getLastSeenMs: Long = {
return lastSeenMs
}
- def getRemainedMem(): Long = {
+ def getRemainedMem: Long = {
return remainedMem
}
- def getRemainedDisk(): Long = {
+ def getRemainedDisk: Long = {
return remainedDisk
}
- override def toString(): String = {
+ override def toString: String = {
return "BlockManagerInfo " + timeMs + " " + remainedMem + " " + remainedDisk
}
@@ -329,8 +329,8 @@ class BlockManagerMaster(val isLocal: Boolean) extends Actor with Logging {
}
}
-object BlockManagerMaster extends Logging {
- initLogging()
+class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Boolean, isLocal: Boolean)
+ extends Logging {
val AKKA_ACTOR_NAME: String = "BlockMasterManager"
val REQUEST_RETRY_INTERVAL_MS = 100
@@ -342,20 +342,18 @@ object BlockManagerMaster extends Logging {
val timeout = 10.seconds
var masterActor: ActorRef = null
- def startBlockManagerMaster(actorSystem: ActorSystem, isMaster: Boolean, isLocal: Boolean) {
- if (isMaster) {
- masterActor = actorSystem.actorOf(
- Props(new BlockManagerMaster(isLocal)), name = AKKA_ACTOR_NAME)
- logInfo("Registered BlockManagerMaster Actor")
- } else {
- val url = "akka://spark@%s:%s/user/%s".format(
- DEFAULT_MASTER_IP, DEFAULT_MASTER_PORT, AKKA_ACTOR_NAME)
- logInfo("Connecting to BlockManagerMaster: " + url)
- masterActor = actorSystem.actorFor(url)
- }
+ if (isMaster) {
+ masterActor = actorSystem.actorOf(
+ Props(new BlockManagerMasterActor(isLocal)), name = AKKA_ACTOR_NAME)
+ logInfo("Registered BlockManagerMaster Actor")
+ } else {
+ val url = "akka://spark@%s:%s/user/%s".format(
+ DEFAULT_MASTER_IP, DEFAULT_MASTER_PORT, AKKA_ACTOR_NAME)
+ logInfo("Connecting to BlockManagerMaster: " + url)
+ masterActor = actorSystem.actorFor(url)
}
- def stopBlockManagerMaster() {
+ def stop() {
if (masterActor != null) {
communicate(StopBlockManagerMaster)
masterActor = null
diff --git a/core/src/main/scala/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/spark/storage/BlockManagerWorker.scala
index 501183ab1f..c61e280252 100644
--- a/core/src/main/scala/spark/storage/BlockManagerWorker.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerWorker.scala
@@ -48,15 +48,15 @@ class BlockManagerWorker(val blockManager: BlockManager) extends Logging {
}
def processBlockMessage(blockMessage: BlockMessage): Option[BlockMessage] = {
- blockMessage.getType() match {
+ blockMessage.getType match {
case BlockMessage.TYPE_PUT_BLOCK => {
- val pB = PutBlock(blockMessage.getId(), blockMessage.getData(), blockMessage.getLevel())
+ val pB = PutBlock(blockMessage.getId, blockMessage.getData, blockMessage.getLevel)
logInfo("Received [" + pB + "]")
putBlock(pB.id, pB.data, pB.level)
return None
}
case BlockMessage.TYPE_GET_BLOCK => {
- val gB = new GetBlock(blockMessage.getId())
+ val gB = new GetBlock(blockMessage.getId)
logInfo("Received [" + gB + "]")
val buffer = getBlock(gB.id)
if (buffer == null) {
diff --git a/core/src/main/scala/spark/storage/BlockMessage.scala b/core/src/main/scala/spark/storage/BlockMessage.scala
index bb128dce7a..0b2ed69e07 100644
--- a/core/src/main/scala/spark/storage/BlockMessage.scala
+++ b/core/src/main/scala/spark/storage/BlockMessage.scala
@@ -102,23 +102,23 @@ class BlockMessage() extends Logging{
set(buffer)
}
- def getType(): Int = {
+ def getType: Int = {
return typ
}
- def getId(): String = {
+ def getId: String = {
return id
}
- def getData(): ByteBuffer = {
+ def getData: ByteBuffer = {
return data
}
- def getLevel(): StorageLevel = {
+ def getLevel: StorageLevel = {
return level
}
- def toBufferMessage(): BufferMessage = {
+ def toBufferMessage: BufferMessage = {
val startTime = System.currentTimeMillis
val buffers = new ArrayBuffer[ByteBuffer]()
var buffer = ByteBuffer.allocate(4 + 4 + id.length() * 2)
@@ -128,7 +128,7 @@ class BlockMessage() extends Logging{
buffers += buffer
if (typ == BlockMessage.TYPE_PUT_BLOCK) {
- buffer = ByteBuffer.allocate(8).putInt(level.toInt()).putInt(level.replication)
+ buffer = ByteBuffer.allocate(8).putInt(level.toInt).putInt(level.replication)
buffer.flip()
buffers += buffer
@@ -164,7 +164,7 @@ class BlockMessage() extends Logging{
return Message.createBufferMessage(buffers)
}
- override def toString(): String = {
+ override def toString: String = {
"BlockMessage [type = " + typ + ", id = " + id + ", level = " + level +
", data = " + (if (data != null) data.remaining.toString else "null") + "]"
}
@@ -209,11 +209,11 @@ object BlockMessage {
def main(args: Array[String]) {
val B = new BlockMessage()
B.set(new PutBlock("ABC", ByteBuffer.allocate(10), StorageLevel.DISK_AND_MEMORY_2))
- val bMsg = B.toBufferMessage()
+ val bMsg = B.toBufferMessage
val C = new BlockMessage()
C.set(bMsg)
- println(B.getId() + " " + B.getLevel())
- println(C.getId() + " " + C.getLevel())
+ println(B.getId + " " + B.getLevel)
+ println(C.getId + " " + C.getLevel)
}
}
diff --git a/core/src/main/scala/spark/storage/BlockMessageArray.scala b/core/src/main/scala/spark/storage/BlockMessageArray.scala
index 5f411d3488..a108ab653e 100644
--- a/core/src/main/scala/spark/storage/BlockMessageArray.scala
+++ b/core/src/main/scala/spark/storage/BlockMessageArray.scala
@@ -123,13 +123,13 @@ object BlockMessageArray {
val newBlockMessageArray = BlockMessageArray.fromBufferMessage(newBufferMessage)
println("Converted back to block message array")
newBlockMessageArray.foreach(blockMessage => {
- blockMessage.getType() match {
+ blockMessage.getType match {
case BlockMessage.TYPE_PUT_BLOCK => {
- val pB = PutBlock(blockMessage.getId(), blockMessage.getData(), blockMessage.getLevel())
+ val pB = PutBlock(blockMessage.getId, blockMessage.getData, blockMessage.getLevel)
println(pB)
}
case BlockMessage.TYPE_GET_BLOCK => {
- val gB = new GetBlock(blockMessage.getId())
+ val gB = new GetBlock(blockMessage.getId)
println(gB)
}
}
diff --git a/core/src/main/scala/spark/storage/StorageLevel.scala b/core/src/main/scala/spark/storage/StorageLevel.scala
index 693a679c4e..f067a2a6c5 100644
--- a/core/src/main/scala/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/spark/storage/StorageLevel.scala
@@ -11,11 +11,8 @@ class StorageLevel(
// TODO: Also add fields for caching priority, dataset ID, and flushing.
- def this(booleanInt: Int, replication: Int) {
- this(((booleanInt & 4) != 0),
- ((booleanInt & 2) != 0),
- ((booleanInt & 1) != 0),
- replication)
+ def this(flags: Int, replication: Int) {
+ this((flags & 4) != 0, (flags & 2) != 0, (flags & 1) != 0, replication)
}
def this() = this(false, true, false) // For deserialization
@@ -33,25 +30,25 @@ class StorageLevel(
false
}
- def isValid() = ((useMemory || useDisk) && (replication > 0))
+ def isValid = ((useMemory || useDisk) && (replication > 0))
- def toInt(): Int = {
+ def toInt: Int = {
var ret = 0
if (useDisk) {
- ret += 4
+ ret |= 4
}
if (useMemory) {
- ret += 2
+ ret |= 2
}
if (deserialized) {
- ret += 1
+ ret |= 1
}
return ret
}
override def writeExternal(out: ObjectOutput) {
- out.writeByte(toInt().toByte)
- out.writeByte(replication.toByte)
+ out.writeByte(toInt)
+ out.writeByte(replication)
}
override def readExternal(in: ObjectInput) {
@@ -62,7 +59,7 @@ class StorageLevel(
replication = in.readByte()
}
- override def toString(): String =
+ override def toString: String =
"StorageLevel(%b, %b, %b, %d)".format(useDisk, useMemory, deserialized, replication)
}
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index 1ed5519d37..61decd81e6 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -5,27 +5,29 @@ import java.nio.ByteBuffer
import akka.actor._
import org.scalatest.FunSuite
-import org.scalatest.BeforeAndAfterEach
+import org.scalatest.BeforeAndAfter
import spark.KryoSerializer
import spark.util.ByteBufferInputStream
-class BlockManagerSuite extends FunSuite with BeforeAndAfterEach {
+class BlockManagerSuite extends FunSuite with BeforeAndAfter {
var actorSystem: ActorSystem = null
+ var master: BlockManagerMaster = null
- override def beforeEach() {
+ before {
actorSystem = ActorSystem("test")
- BlockManagerMaster.startBlockManagerMaster(actorSystem, true, true)
+ master = new BlockManagerMaster(actorSystem, true, true)
}
- override def afterEach() {
+ after {
actorSystem.shutdown()
actorSystem.awaitTermination()
actorSystem = null
+ master = null
}
test("manager-master interaction") {
- val store = new BlockManager(2000, new KryoSerializer)
+ val store = new BlockManager(master, new KryoSerializer, 2000)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -41,21 +43,21 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfterEach {
assert(store.getSingle("a3") != None, "a3 was not in store")
// Checking whether master knows about the blocks or not
- assert(BlockManagerMaster.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
- assert(BlockManagerMaster.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2")
- assert(BlockManagerMaster.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3")
+ assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
+ assert(master.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2")
+ assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3")
// Setting storage level of a1 and a2 to invalid; they should be removed from store and master
store.setLevel("a1", new StorageLevel(false, false, false, 1))
store.setLevel("a2", new StorageLevel(true, false, false, 0))
assert(store.getSingle("a1") === None, "a1 not removed from store")
assert(store.getSingle("a2") === None, "a2 not removed from store")
- assert(BlockManagerMaster.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1")
- assert(BlockManagerMaster.mustGetLocations(GetLocations("a2")).size === 0, "master did not remove a2")
+ assert(master.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1")
+ assert(master.mustGetLocations(GetLocations("a2")).size === 0, "master did not remove a2")
}
test("in-memory LRU storage") {
- val store = new BlockManager(1000, new KryoSerializer)
+ val store = new BlockManager(master, new KryoSerializer, 1000)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -76,7 +78,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfterEach {
}
test("in-memory LRU storage with serialization") {
- val store = new BlockManager(1000, new KryoSerializer)
+ val store = new BlockManager(master, new KryoSerializer, 1000)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -97,7 +99,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfterEach {
}
test("on-disk storage") {
- val store = new BlockManager(1000, new KryoSerializer)
+ val store = new BlockManager(master, new KryoSerializer, 1000)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -110,7 +112,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfterEach {
}
test("disk and memory storage") {
- val store = new BlockManager(1000, new KryoSerializer)
+ val store = new BlockManager(master, new KryoSerializer, 1000)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -124,7 +126,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfterEach {
}
test("disk and memory storage with serialization") {
- val store = new BlockManager(1000, new KryoSerializer)
+ val store = new BlockManager(master, new KryoSerializer, 1000)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -138,7 +140,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfterEach {
}
test("LRU with mixed storage levels") {
- val store = new BlockManager(1000, new KryoSerializer)
+ val store = new BlockManager(master, new KryoSerializer, 1000)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -164,7 +166,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfterEach {
}
test("in-memory LRU with streams") {
- val store = new BlockManager(1000, new KryoSerializer)
+ val store = new BlockManager(master, new KryoSerializer, 1000)
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
@@ -190,7 +192,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfterEach {
}
test("LRU with mixed storage levels and streams") {
- val store = new BlockManager(1000, new KryoSerializer)
+ val store = new BlockManager(master, new KryoSerializer, 1000)
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))