aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/spark/JavaAPISuite.java42
-rw-r--r--core/src/test/scala/spark/MapOutputTrackerSuite.scala55
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala22
-rw-r--r--core/src/test/scala/spark/ShuffleSuite.scala39
-rw-r--r--core/src/test/scala/spark/storage/BlockManagerSuite.scala206
5 files changed, 281 insertions, 83 deletions
diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java
index 5875506179..33d5fc2d89 100644
--- a/core/src/test/scala/spark/JavaAPISuite.java
+++ b/core/src/test/scala/spark/JavaAPISuite.java
@@ -1,5 +1,12 @@
package spark;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.*;
+
+import scala.Tuple2;
+
import com.google.common.base.Charsets;
import com.google.common.io.Files;
import org.apache.hadoop.io.IntWritable;
@@ -12,8 +19,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import scala.Tuple2;
-
import spark.api.java.JavaDoubleRDD;
import spark.api.java.JavaPairRDD;
import spark.api.java.JavaRDD;
@@ -24,10 +29,6 @@ import spark.partial.PartialResult;
import spark.storage.StorageLevel;
import spark.util.StatCounter;
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.*;
// The test suite itself is Serializable so that anonymous Function implementations can be
// serialized, as an alternative to converting these anonymous classes to static inner classes;
@@ -44,6 +45,8 @@ public class JavaAPISuite implements Serializable {
public void tearDown() {
sc.stop();
sc = null;
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port");
}
static class ReverseIntComparator implements Comparator<Integer>, Serializable {
@@ -128,6 +131,17 @@ public class JavaAPISuite implements Serializable {
}
@Test
+ public void lookup() {
+ JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
+ new Tuple2<String, String>("Apples", "Fruit"),
+ new Tuple2<String, String>("Oranges", "Fruit"),
+ new Tuple2<String, String>("Oranges", "Citrus")
+ ));
+ Assert.assertEquals(2, categories.lookup("Oranges").size());
+ Assert.assertEquals(2, categories.groupByKey().lookup("Oranges").get(0).size());
+ }
+
+ @Test
public void groupBy() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
Function<Integer, Boolean> isOdd = new Function<Integer, Boolean>() {
@@ -381,7 +395,8 @@ public class JavaAPISuite implements Serializable {
@Test
public void iterator() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);
- Assert.assertEquals(1, rdd.iterator(rdd.splits().get(0)).next().intValue());
+ TaskContext context = new TaskContext(0, 0, 0);
+ Assert.assertEquals(1, rdd.iterator(rdd.splits().get(0), context).next().intValue());
}
@Test
@@ -553,4 +568,17 @@ public class JavaAPISuite implements Serializable {
}
}).collect().toString());
}
+
+ @Test
+ public void zip() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+ JavaDoubleRDD doubles = rdd.map(new DoubleFunction<Integer>() {
+ @Override
+ public Double call(Integer x) {
+ return 1.0 * x;
+ }
+ });
+ JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles);
+ zipped.count();
+ }
}
diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
index 4e9717d871..5b4b198960 100644
--- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
@@ -2,10 +2,14 @@ package spark
import org.scalatest.FunSuite
+import akka.actor._
+import spark.scheduler.MapStatus
+import spark.storage.BlockManagerId
+
class MapOutputTrackerSuite extends FunSuite {
test("compressSize") {
assert(MapOutputTracker.compressSize(0L) === 0)
- assert(MapOutputTracker.compressSize(1L) === 0)
+ assert(MapOutputTracker.compressSize(1L) === 1)
assert(MapOutputTracker.compressSize(2L) === 8)
assert(MapOutputTracker.compressSize(10L) === 25)
assert((MapOutputTracker.compressSize(1000000L) & 0xFF) === 145)
@@ -15,11 +19,58 @@ class MapOutputTrackerSuite extends FunSuite {
}
test("decompressSize") {
- assert(MapOutputTracker.decompressSize(0) === 1)
+ assert(MapOutputTracker.decompressSize(0) === 0)
for (size <- Seq(2L, 10L, 100L, 50000L, 1000000L, 1000000000L)) {
val size2 = MapOutputTracker.decompressSize(MapOutputTracker.compressSize(size))
assert(size2 >= 0.99 * size && size2 <= 1.11 * size,
"size " + size + " decompressed to " + size2 + ", which is out of range")
}
}
+
+ test("master start and stop") {
+ val actorSystem = ActorSystem("test")
+ val tracker = new MapOutputTracker(actorSystem, true)
+ tracker.stop()
+ }
+
+ test("master register and fetch") {
+ val actorSystem = ActorSystem("test")
+ val tracker = new MapOutputTracker(actorSystem, true)
+ tracker.registerShuffle(10, 2)
+ val compressedSize1000 = MapOutputTracker.compressSize(1000L)
+ val compressedSize10000 = MapOutputTracker.compressSize(10000L)
+ val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
+ val size10000 = MapOutputTracker.decompressSize(compressedSize10000)
+ tracker.registerMapOutput(10, 0, new MapStatus(new BlockManagerId("hostA", 1000),
+ Array(compressedSize1000, compressedSize10000)))
+ tracker.registerMapOutput(10, 1, new MapStatus(new BlockManagerId("hostB", 1000),
+ Array(compressedSize10000, compressedSize1000)))
+ val statuses = tracker.getServerStatuses(10, 0)
+ assert(statuses.toSeq === Seq((new BlockManagerId("hostA", 1000), size1000),
+ (new BlockManagerId("hostB", 1000), size10000)))
+ tracker.stop()
+ }
+
+ test("master register and unregister and fetch") {
+ val actorSystem = ActorSystem("test")
+ val tracker = new MapOutputTracker(actorSystem, true)
+ tracker.registerShuffle(10, 2)
+ val compressedSize1000 = MapOutputTracker.compressSize(1000L)
+ val compressedSize10000 = MapOutputTracker.compressSize(10000L)
+ val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
+ val size10000 = MapOutputTracker.decompressSize(compressedSize10000)
+ tracker.registerMapOutput(10, 0, new MapStatus(new BlockManagerId("hostA", 1000),
+ Array(compressedSize1000, compressedSize1000, compressedSize1000)))
+ tracker.registerMapOutput(10, 1, new MapStatus(new BlockManagerId("hostB", 1000),
+ Array(compressedSize10000, compressedSize1000, compressedSize1000)))
+
+ // As if we had two simulatenous fetch failures
+ tracker.unregisterMapOutput(10, 0, new BlockManagerId("hostA", 1000))
+ tracker.unregisterMapOutput(10, 0, new BlockManagerId("hostA", 1000))
+
+ // The remaining reduce task might try to grab the output dispite the shuffle failure;
+ // this should cause it to fail, and the scheduler will ignore the failure due to the
+ // stage already being aborted.
+ intercept[Exception] { tracker.getServerStatuses(10, 1) }
+ }
}
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 37a0ff0947..08da9a1c4d 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -8,9 +8,9 @@ import spark.rdd.CoalescedRDD
import SparkContext._
class RDDSuite extends FunSuite with BeforeAndAfter {
-
+
var sc: SparkContext = _
-
+
after {
if (sc != null) {
sc.stop()
@@ -19,11 +19,15 @@ class RDDSuite extends FunSuite with BeforeAndAfter {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.master.port")
}
-
+
test("basic operations") {
sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
assert(nums.collect().toList === List(1, 2, 3, 4))
+ val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2)
+ assert(dups.distinct.count === 4)
+ assert(dups.distinct().collect === dups.distinct.collect)
+ assert(dups.distinct(2).collect === dups.distinct.collect)
assert(nums.reduce(_ + _) === 10)
assert(nums.fold(0)(_ + _) === 10)
assert(nums.map(_.toString).collect().toList === List("1", "2", "3", "4"))
@@ -114,4 +118,16 @@ class RDDSuite extends FunSuite with BeforeAndAfter {
assert(coalesced4.glom().collect().map(_.toList).toList ===
(1 to 10).map(x => List(x)).toList)
}
+
+ test("zipped RDDs") {
+ sc = new SparkContext("local", "test")
+ val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+ val zipped = nums.zip(nums.map(_ + 1.0))
+ assert(zipped.glom().map(_.toList).collect().toList ===
+ List(List((1, 2.0), (2, 3.0)), List((3, 4.0), (4, 5.0))))
+
+ intercept[IllegalArgumentException] {
+ nums.zip(sc.parallelize(1 to 4, 1)).collect()
+ }
+ }
}
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index 7f8ec5d48f..8170100f1d 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -12,8 +12,8 @@ import org.scalacheck.Prop._
import com.google.common.io.Files
-import spark.rdd.ShuffledAggregatedRDD
-import SparkContext._
+import spark.rdd.ShuffledRDD
+import spark.SparkContext._
class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
@@ -216,41 +216,6 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
// Test that a shuffle on the file works, because this used to be a bug
assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
}
-
- test("map-side combine") {
- sc = new SparkContext("local", "test")
- val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1), (1, 1)), 2)
-
- // Test with map-side combine on.
- val sums = pairs.reduceByKey(_+_).collect()
- assert(sums.toSet === Set((1, 8), (2, 1)))
-
- // Turn off map-side combine and test the results.
- val aggregator = new Aggregator[Int, Int, Int](
- (v: Int) => v,
- _+_,
- _+_,
- false)
- val shuffledRdd = new ShuffledAggregatedRDD(
- pairs, aggregator, new HashPartitioner(2))
- assert(shuffledRdd.collect().toSet === Set((1, 8), (2, 1)))
-
- // Turn map-side combine off and pass a wrong mergeCombine function. Should
- // not see an exception because mergeCombine should not have been called.
- val aggregatorWithException = new Aggregator[Int, Int, Int](
- (v: Int) => v, _+_, ShuffleSuite.mergeCombineException, false)
- val shuffledRdd1 = new ShuffledAggregatedRDD(
- pairs, aggregatorWithException, new HashPartitioner(2))
- assert(shuffledRdd1.collect().toSet === Set((1, 8), (2, 1)))
-
- // Now run the same mergeCombine function with map-side combine on. We
- // expect to see an exception thrown.
- val aggregatorWithException1 = new Aggregator[Int, Int, Int](
- (v: Int) => v, _+_, ShuffleSuite.mergeCombineException)
- val shuffledRdd2 = new ShuffledAggregatedRDD(
- pairs, aggregatorWithException1, new HashPartitioner(2))
- evaluating { shuffledRdd2.collect() } should produce [SparkException]
- }
}
object ShuffleSuite {
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index b9c19e61cd..8f86e3170e 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -7,6 +7,10 @@ import akka.actor._
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import org.scalatest.PrivateMethodTester
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.concurrent.Timeouts._
+import org.scalatest.matchers.ShouldMatchers._
+import org.scalatest.time.SpanSugar._
import spark.KryoSerializer
import spark.SizeEstimator
@@ -14,21 +18,25 @@ import spark.util.ByteBufferInputStream
class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester {
var store: BlockManager = null
+ var store2: BlockManager = null
var actorSystem: ActorSystem = null
var master: BlockManagerMaster = null
var oldArch: String = null
var oldOops: String = null
-
- // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
+ var oldHeartBeat: String = null
+
+ // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
+ System.setProperty("spark.kryoserializer.buffer.mb", "1")
val serializer = new KryoSerializer
before {
actorSystem = ActorSystem("test")
- master = new BlockManagerMaster(actorSystem, true, true)
+ master = new BlockManagerMaster(actorSystem, true, true, "localhost", 7077)
- // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
+ // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
oldArch = System.setProperty("os.arch", "amd64")
oldOops = System.setProperty("spark.test.useCompressedOops", "true")
+ oldHeartBeat = System.setProperty("spark.storage.disableBlockManagerHeartBeat", "true")
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
}
@@ -36,6 +44,11 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
after {
if (store != null) {
store.stop()
+ store = null
+ }
+ if (store2 != null) {
+ store2.stop()
+ store2 = null
}
actorSystem.shutdown()
actorSystem.awaitTermination()
@@ -55,8 +68,34 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
}
- test("manager-master interaction") {
- store = new BlockManager(master, serializer, 2000)
+ test("StorageLevel object caching") {
+ val level1 = new StorageLevel(false, false, false, 3)
+ val level2 = new StorageLevel(false, false, false, 3)
+ val bytes1 = spark.Utils.serialize(level1)
+ val level1_ = spark.Utils.deserialize[StorageLevel](bytes1)
+ val bytes2 = spark.Utils.serialize(level2)
+ val level2_ = spark.Utils.deserialize[StorageLevel](bytes2)
+ assert(level1_ === level1, "Deserialized level1 not same as original level1")
+ assert(level2_ === level2, "Deserialized level2 not same as original level1")
+ assert(level1_ === level2_, "Deserialized level1 not same as deserialized level2")
+ assert(level2_.eq(level1_), "Deserialized level2 not the same object as deserialized level1")
+ }
+
+ test("BlockManagerId object caching") {
+ val id1 = new StorageLevel(false, false, false, 3)
+ val id2 = new StorageLevel(false, false, false, 3)
+ val bytes1 = spark.Utils.serialize(id1)
+ val id1_ = spark.Utils.deserialize[StorageLevel](bytes1)
+ val bytes2 = spark.Utils.serialize(id2)
+ val id2_ = spark.Utils.deserialize[StorageLevel](bytes2)
+ assert(id1_ === id1, "Deserialized id1 not same as original id1")
+ assert(id2_ === id2, "Deserialized id2 not same as original id1")
+ assert(id1_ === id2_, "Deserialized id1 not same as deserialized id2")
+ assert(id2_.eq(id1_), "Deserialized id2 not the same object as deserialized level1")
+ }
+
+ test("master + 1 manager interaction") {
+ store = new BlockManager(actorSystem, master, serializer, 2000)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -66,27 +105,126 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, false)
- // Checking whether blocks are in memory
+ // Checking whether blocks are in memory
assert(store.getSingle("a1") != None, "a1 was not in store")
assert(store.getSingle("a2") != None, "a2 was not in store")
assert(store.getSingle("a3") != None, "a3 was not in store")
// Checking whether master knows about the blocks or not
- assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
- assert(master.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2")
- assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3")
-
+ assert(master.getLocations("a1").size > 0, "master was not told about a1")
+ assert(master.getLocations("a2").size > 0, "master was not told about a2")
+ assert(master.getLocations("a3").size === 0, "master was told about a3")
+
// Drop a1 and a2 from memory; this should be reported back to the master
store.dropFromMemory("a1", null)
store.dropFromMemory("a2", null)
assert(store.getSingle("a1") === None, "a1 not removed from store")
assert(store.getSingle("a2") === None, "a2 not removed from store")
- assert(master.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1")
- assert(master.mustGetLocations(GetLocations("a2")).size === 0, "master did not remove a2")
+ assert(master.getLocations("a1").size === 0, "master did not remove a1")
+ assert(master.getLocations("a2").size === 0, "master did not remove a2")
+ }
+
+ test("master + 2 managers interaction") {
+ store = new BlockManager(actorSystem, master, serializer, 2000)
+ store2 = new BlockManager(actorSystem, master, new KryoSerializer, 2000)
+
+ val peers = master.getPeers(store.blockManagerId, 1)
+ assert(peers.size === 1, "master did not return the other manager as a peer")
+ assert(peers.head === store2.blockManagerId, "peer returned by master is not the other manager")
+
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_2)
+ store2.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2)
+ assert(master.getLocations("a1").size === 2, "master did not report 2 locations for a1")
+ assert(master.getLocations("a2").size === 2, "master did not report 2 locations for a2")
+ }
+
+ test("removing block") {
+ store = new BlockManager(actorSystem, master, serializer, 2000)
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+ val a3 = new Array[Byte](400)
+
+ // Putting a1, a2 and a3 in memory and telling master only about a1 and a2
+ store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a2-to-remove", a2, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, false)
+
+ // Checking whether blocks are in memory and memory size
+ val memStatus = master.getMemoryStatus.head._2
+ assert(memStatus._1 == 2000L, "total memory " + memStatus._1 + " should equal 2000")
+ assert(memStatus._2 <= 1200L, "remaining memory " + memStatus._2 + " should <= 1200")
+ assert(store.getSingle("a1-to-remove") != None, "a1 was not in store")
+ assert(store.getSingle("a2-to-remove") != None, "a2 was not in store")
+ assert(store.getSingle("a3-to-remove") != None, "a3 was not in store")
+
+ // Checking whether master knows about the blocks or not
+ assert(master.getLocations("a1-to-remove").size > 0, "master was not told about a1")
+ assert(master.getLocations("a2-to-remove").size > 0, "master was not told about a2")
+ assert(master.getLocations("a3-to-remove").size === 0, "master was told about a3")
+
+ // Remove a1 and a2 and a3. Should be no-op for a3.
+ master.removeBlock("a1-to-remove")
+ master.removeBlock("a2-to-remove")
+ master.removeBlock("a3-to-remove")
+
+ eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ store.getSingle("a1-to-remove") should be (None)
+ master.getLocations("a1-to-remove") should have size 0
+ }
+ eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ store.getSingle("a2-to-remove") should be (None)
+ master.getLocations("a2-to-remove") should have size 0
+ }
+ eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ store.getSingle("a3-to-remove") should not be (None)
+ master.getLocations("a3-to-remove") should have size 0
+ }
+ eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ val memStatus = master.getMemoryStatus.head._2
+ memStatus._1 should equal (2000L)
+ memStatus._2 should equal (2000L)
+ }
+ }
+
+ test("reregistration on heart beat") {
+ val heartBeat = PrivateMethod[Unit]('heartBeat)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
+ val a1 = new Array[Byte](400)
+
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+
+ assert(store.getSingle("a1") != None, "a1 was not in store")
+ assert(master.getLocations("a1").size > 0, "master was not told about a1")
+
+ master.notifyADeadHost(store.blockManagerId.ip)
+ assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
+
+ store invokePrivate heartBeat()
+ assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
+ }
+
+ test("reregistration on block update") {
+ store = new BlockManager(actorSystem, master, serializer, 2000)
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+
+ assert(master.getLocations("a1").size > 0, "master was not told about a1")
+
+ master.notifyADeadHost(store.blockManagerId.ip)
+ assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
+
+ store.putSingle("a2", a1, StorageLevel.MEMORY_ONLY)
+
+ assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
+ assert(master.getLocations("a2").size > 0, "master was not told about a2")
}
test("in-memory LRU storage") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -103,9 +241,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.getSingle("a2") != None, "a2 was not in store")
assert(store.getSingle("a3") === None, "a3 was in store")
}
-
+
test("in-memory LRU storage with serialization") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -124,7 +262,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU for partitions of same RDD") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -143,7 +281,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU for partitions of multiple RDDs") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
store.putSingle("rdd_0_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
store.putSingle("rdd_0_2", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
store.putSingle("rdd_1_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
@@ -166,7 +304,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("on-disk storage") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -179,7 +317,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -194,7 +332,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with getLocalBytes") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -209,7 +347,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with serialization") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -224,7 +362,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with serialization and getLocalBytes") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -239,7 +377,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("LRU with mixed storage levels") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -264,7 +402,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU with streams") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
@@ -288,7 +426,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("LRU with mixed storage levels and streams") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
@@ -334,7 +472,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("overly large block") {
- store = new BlockManager(master, serializer, 500)
+ store = new BlockManager(actorSystem, master, serializer, 500)
store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
assert(store.getSingle("a1") === None, "a1 was in store")
store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK)
@@ -345,49 +483,49 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
test("block compression") {
try {
System.setProperty("spark.shuffle.compress", "true")
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("shuffle_0_0_0") <= 100, "shuffle_0_0_0 was not compressed")
store.stop()
store = null
System.setProperty("spark.shuffle.compress", "false")
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("shuffle_0_0_0") >= 1000, "shuffle_0_0_0 was compressed")
store.stop()
store = null
System.setProperty("spark.broadcast.compress", "true")
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("broadcast_0") <= 100, "broadcast_0 was not compressed")
store.stop()
store = null
System.setProperty("spark.broadcast.compress", "false")
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("broadcast_0") >= 1000, "broadcast_0 was compressed")
store.stop()
store = null
System.setProperty("spark.rdd.compress", "true")
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("rdd_0_0") <= 100, "rdd_0_0 was not compressed")
store.stop()
store = null
System.setProperty("spark.rdd.compress", "false")
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("rdd_0_0") >= 1000, "rdd_0_0 was compressed")
store.stop()
store = null
// Check that any other block types are also kept uncompressed
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("other_block", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
assert(store.memoryStore.getSize("other_block") >= 1000, "other_block was compressed")
store.stop()