From efc5423210d1aadeaea78273a4a8f10425753079 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Sun, 7 Oct 2012 11:30:53 -0700 Subject: Made compression configurable separately for shuffle, broadcast and RDDs --- .../scala/spark/storage/BlockManagerSuite.scala | 96 ++++++++++++++++++---- 1 file changed, 80 insertions(+), 16 deletions(-) (limited to 'core/src/test/scala') diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index 213a423fef..31b33eae09 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -13,10 +13,14 @@ import spark.SizeEstimator import spark.util.ByteBufferInputStream class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester { + var store: BlockManager = null var actorSystem: ActorSystem = null var master: BlockManagerMaster = null - var oldArch: String = _ - var oldOops: String = _ + var oldArch: String = null + var oldOops: String = null + + // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test + val serializer = new KryoSerializer before { actorSystem = ActorSystem("test") @@ -30,6 +34,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } after { + if (store != null) { + store.stop() + } actorSystem.shutdown() actorSystem.awaitTermination() actorSystem = null @@ -49,7 +56,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("manager-master interaction") { - val store = new BlockManager(master, new KryoSerializer, 2000) + store = new BlockManager(master, serializer, 2000) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -79,7 +86,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU storage") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -98,7 +105,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU storage with serialization") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -117,7 +124,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU for partitions of same RDD") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -136,7 +143,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU for partitions of multiple RDDs") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) store.putSingle("rdd_0_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY) store.putSingle("rdd_0_2", new Array[Byte](400), StorageLevel.MEMORY_ONLY) store.putSingle("rdd_1_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY) @@ -159,7 +166,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("on-disk storage") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -172,7 +179,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -187,7 +194,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage with getLocalBytes") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -202,7 +209,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage with serialization") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -217,7 +224,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage with serialization and getLocalBytes") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -232,7 +239,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("LRU with mixed storage levels") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -257,7 +264,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU with streams") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val list1 = List(new Array[Byte](200), new Array[Byte](200)) val list2 = List(new Array[Byte](200), new Array[Byte](200)) val list3 = List(new Array[Byte](200), new Array[Byte](200)) @@ -281,7 +288,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("LRU with mixed storage levels and streams") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val list1 = List(new Array[Byte](200), new Array[Byte](200)) val list2 = List(new Array[Byte](200), new Array[Byte](200)) val list3 = List(new Array[Byte](200), new Array[Byte](200)) @@ -327,11 +334,68 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("overly large block") { - val store = new BlockManager(master, new KryoSerializer, 500) + store = new BlockManager(master, serializer, 500) store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY) assert(store.getSingle("a1") === None, "a1 was in store") store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK) assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store") assert(store.getSingle("a2") != None, "a2 was not in store") } + + test("block compression") { + try { + System.setProperty("spark.shuffle.compress", "true") + store = new BlockManager(master, serializer, 2000) + store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize("shuffle_0_0_0") <= 100, "shuffle_0_0_0 was not compressed") + store.stop() + store = null + + System.setProperty("spark.shuffle.compress", "false") + store = new BlockManager(master, serializer, 2000) + store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize("shuffle_0_0_0") >= 1000, "shuffle_0_0_0 was compressed") + store.stop() + store = null + + System.setProperty("spark.broadcast.compress", "true") + store = new BlockManager(master, serializer, 2000) + store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize("broadcast_0") <= 100, "broadcast_0 was not compressed") + store.stop() + store = null + + System.setProperty("spark.broadcast.compress", "false") + store = new BlockManager(master, serializer, 2000) + store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize("broadcast_0") >= 1000, "broadcast_0 was compressed") + store.stop() + store = null + + System.setProperty("spark.rdd.compress", "true") + store = new BlockManager(master, serializer, 2000) + store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize("rdd_0_0") <= 100, "rdd_0_0 was not compressed") + store.stop() + store = null + + System.setProperty("spark.rdd.compress", "false") + store = new BlockManager(master, serializer, 2000) + store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize("rdd_0_0") >= 1000, "rdd_0_0 was compressed") + store.stop() + store = null + + // Check that any other block types are also kept uncompressed + store = new BlockManager(master, serializer, 2000) + store.putSingle("other_block", new Array[Byte](1000), StorageLevel.MEMORY_ONLY) + assert(store.memoryStore.getSize("other_block") >= 1000, "other_block was compressed") + store.stop() + store = null + } finally { + System.clearProperty("spark.shuffle.compress") + System.clearProperty("spark.broadcast.compress") + System.clearProperty("spark.rdd.compress") + } + } } -- cgit v1.2.3