aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-10-07 11:30:53 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-10-07 11:30:53 -0700
commitefc5423210d1aadeaea78273a4a8f10425753079 (patch)
tree86e0ca94f41ef5a17b92be3d7be45b77f762e1f8 /core/src/test/scala
parent039cc6228e92b3ee7a05ebbbe4b915be2c1db1f3 (diff)
downloadspark-efc5423210d1aadeaea78273a4a8f10425753079.tar.gz
spark-efc5423210d1aadeaea78273a4a8f10425753079.tar.bz2
spark-efc5423210d1aadeaea78273a4a8f10425753079.zip
Made compression configurable separately for shuffle, broadcast and RDDs
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/spark/storage/BlockManagerSuite.scala96
1 files changed, 80 insertions, 16 deletions
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index 213a423fef..31b33eae09 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -13,10 +13,14 @@ import spark.SizeEstimator
import spark.util.ByteBufferInputStream
class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester {
+ var store: BlockManager = null
var actorSystem: ActorSystem = null
var master: BlockManagerMaster = null
- var oldArch: String = _
- var oldOops: String = _
+ var oldArch: String = null
+ var oldOops: String = null
+
+ // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
+ val serializer = new KryoSerializer
before {
actorSystem = ActorSystem("test")
@@ -30,6 +34,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
after {
+ if (store != null) {
+ store.stop()
+ }
actorSystem.shutdown()
actorSystem.awaitTermination()
actorSystem = null
@@ -49,7 +56,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("manager-master interaction") {
- val store = new BlockManager(master, new KryoSerializer, 2000)
+ store = new BlockManager(master, serializer, 2000)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -79,7 +86,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU storage") {
- val store = new BlockManager(master, new KryoSerializer, 1200)
+ store = new BlockManager(master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -98,7 +105,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU storage with serialization") {
- val store = new BlockManager(master, new KryoSerializer, 1200)
+ store = new BlockManager(master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -117,7 +124,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU for partitions of same RDD") {
- val store = new BlockManager(master, new KryoSerializer, 1200)
+ store = new BlockManager(master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -136,7 +143,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU for partitions of multiple RDDs") {
- val store = new BlockManager(master, new KryoSerializer, 1200)
+ store = new BlockManager(master, serializer, 1200)
store.putSingle("rdd_0_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
store.putSingle("rdd_0_2", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
store.putSingle("rdd_1_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
@@ -159,7 +166,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("on-disk storage") {
- val store = new BlockManager(master, new KryoSerializer, 1200)
+ store = new BlockManager(master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -172,7 +179,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage") {
- val store = new BlockManager(master, new KryoSerializer, 1200)
+ store = new BlockManager(master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -187,7 +194,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with getLocalBytes") {
- val store = new BlockManager(master, new KryoSerializer, 1200)
+ store = new BlockManager(master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -202,7 +209,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with serialization") {
- val store = new BlockManager(master, new KryoSerializer, 1200)
+ store = new BlockManager(master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -217,7 +224,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with serialization and getLocalBytes") {
- val store = new BlockManager(master, new KryoSerializer, 1200)
+ store = new BlockManager(master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -232,7 +239,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("LRU with mixed storage levels") {
- val store = new BlockManager(master, new KryoSerializer, 1200)
+ store = new BlockManager(master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -257,7 +264,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU with streams") {
- val store = new BlockManager(master, new KryoSerializer, 1200)
+ store = new BlockManager(master, serializer, 1200)
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
@@ -281,7 +288,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("LRU with mixed storage levels and streams") {
- val store = new BlockManager(master, new KryoSerializer, 1200)
+ store = new BlockManager(master, serializer, 1200)
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
@@ -327,11 +334,68 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("overly large block") {
- val store = new BlockManager(master, new KryoSerializer, 500)
+ store = new BlockManager(master, serializer, 500)
store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
assert(store.getSingle("a1") === None, "a1 was in store")
store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK)
assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store")
assert(store.getSingle("a2") != None, "a2 was not in store")
}
+
+ test("block compression") {
+ try {
+ System.setProperty("spark.shuffle.compress", "true")
+ store = new BlockManager(master, serializer, 2000)
+ store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize("shuffle_0_0_0") <= 100, "shuffle_0_0_0 was not compressed")
+ store.stop()
+ store = null
+
+ System.setProperty("spark.shuffle.compress", "false")
+ store = new BlockManager(master, serializer, 2000)
+ store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize("shuffle_0_0_0") >= 1000, "shuffle_0_0_0 was compressed")
+ store.stop()
+ store = null
+
+ System.setProperty("spark.broadcast.compress", "true")
+ store = new BlockManager(master, serializer, 2000)
+ store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize("broadcast_0") <= 100, "broadcast_0 was not compressed")
+ store.stop()
+ store = null
+
+ System.setProperty("spark.broadcast.compress", "false")
+ store = new BlockManager(master, serializer, 2000)
+ store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize("broadcast_0") >= 1000, "broadcast_0 was compressed")
+ store.stop()
+ store = null
+
+ System.setProperty("spark.rdd.compress", "true")
+ store = new BlockManager(master, serializer, 2000)
+ store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize("rdd_0_0") <= 100, "rdd_0_0 was not compressed")
+ store.stop()
+ store = null
+
+ System.setProperty("spark.rdd.compress", "false")
+ store = new BlockManager(master, serializer, 2000)
+ store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize("rdd_0_0") >= 1000, "rdd_0_0 was compressed")
+ store.stop()
+ store = null
+
+ // Check that any other block types are also kept uncompressed
+ store = new BlockManager(master, serializer, 2000)
+ store.putSingle("other_block", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
+ assert(store.memoryStore.getSize("other_block") >= 1000, "other_block was compressed")
+ store.stop()
+ store = null
+ } finally {
+ System.clearProperty("spark.shuffle.compress")
+ System.clearProperty("spark.broadcast.compress")
+ System.clearProperty("spark.rdd.compress")
+ }
+ }
}