diff options
Diffstat (limited to 'core/src/test')
20 files changed, 783 insertions, 103 deletions
diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index 02fe16866e..4c99e450bc 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -1,8 +1,10 @@ # Set everything to be logged to the console -log4j.rootCategory=WARN, console -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n +log4j.rootCategory=INFO, file +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=false +log4j.appender.file.file=spark-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n # Ignore messages below warning level from Jetty, because it's a bit verbose log4j.logger.org.eclipse.jetty=WARN diff --git a/core/src/test/resources/uncommons-maths-1.2.2.jar b/core/src/test/resources/uncommons-maths-1.2.2.jar Binary files differnew file mode 100644 index 0000000000..e126001c1c --- /dev/null +++ b/core/src/test/resources/uncommons-maths-1.2.2.jar diff --git a/core/src/test/scala/spark/AccumulatorSuite.scala b/core/src/test/scala/spark/AccumulatorSuite.scala index d55969c261..d8be99dde7 100644 --- a/core/src/test/scala/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/spark/AccumulatorSuite.scala @@ -18,6 +18,8 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter sc.stop() sc = null } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } test ("basic accumulation"){ @@ -53,10 +55,11 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter } sc.stop() sc = null + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } } - implicit object SetAccum extends AccumulableParam[mutable.Set[Any], Any] { def addInPlace(t1: mutable.Set[Any], t2: mutable.Set[Any]) : mutable.Set[Any] = { t1 ++= t2 @@ -71,7 +74,6 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter } } - test ("value not readable in tasks") { import SetAccum._ val maxI = 1000 @@ -86,6 +88,54 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter } should produce [SparkException] sc.stop() sc = null + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") + } + } + + test ("collection accumulators") { + val maxI = 1000 + for (nThreads <- List(1, 10)) { + // test single & multi-threaded + sc = new SparkContext("local[" + nThreads + "]", "test") + val setAcc = sc.accumulableCollection(mutable.HashSet[Int]()) + val bufferAcc = sc.accumulableCollection(mutable.ArrayBuffer[Int]()) + val mapAcc = sc.accumulableCollection(mutable.HashMap[Int,String]()) + val d = sc.parallelize((1 to maxI) ++ (1 to maxI)) + d.foreach { + x => {setAcc += x; bufferAcc += x; mapAcc += (x -> x.toString)} + } + + // Note that this is typed correctly -- no casts necessary + setAcc.value.size should be (maxI) + bufferAcc.value.size should be (2 * maxI) + mapAcc.value.size should be (maxI) + for (i <- 1 to maxI) { + setAcc.value should contain(i) + bufferAcc.value should contain(i) + mapAcc.value should contain (i -> i.toString) + } + sc.stop() + sc = null + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") + } + } + + test ("localValue readable in tasks") { + import SetAccum._ + val maxI = 1000 + for (nThreads <- List(1, 10)) { //test single & multi-threaded + sc = new SparkContext("local[" + nThreads + "]", "test") + val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]()) + val groupedInts = (1 to (maxI/20)).map {x => (20 * (x - 1) to 20 * x).toSet} + val d = sc.parallelize(groupedInts) + d.foreach { + x => acc.localValue ++= x + } + acc.value should be ( (0 to maxI).toSet) + sc.stop() + sc = null } } diff --git a/core/src/test/scala/spark/BoundedMemoryCacheSuite.scala b/core/src/test/scala/spark/BoundedMemoryCacheSuite.scala index dff2970566..37cafd1e8e 100644 --- a/core/src/test/scala/spark/BoundedMemoryCacheSuite.scala +++ b/core/src/test/scala/spark/BoundedMemoryCacheSuite.scala @@ -2,8 +2,10 @@ package spark import org.scalatest.FunSuite import org.scalatest.PrivateMethodTester +import org.scalatest.matchers.ShouldMatchers -class BoundedMemoryCacheSuite extends FunSuite with PrivateMethodTester { +// TODO: Replace this with a test of MemoryStore +class BoundedMemoryCacheSuite extends FunSuite with PrivateMethodTester with ShouldMatchers { test("constructor test") { val cache = new BoundedMemoryCache(60) expect(60)(cache.getCapacity) @@ -22,15 +24,21 @@ class BoundedMemoryCacheSuite extends FunSuite with PrivateMethodTester { logInfo("Dropping key (%s, %d) of size %d to make space".format(datasetId, partition, entry.size)) } } + + // NOTE: The String class definition changed in JDK 7 to exclude the int fields count and length + // This means that the size of strings will be lesser by 8 bytes in JDK 7 compared to JDK 6. + // http://mail.openjdk.java.net/pipermail/core-libs-dev/2012-May/010257.html + // Work around to check for either. + //should be OK - expect(CachePutSuccess(56))(cache.put("1", 0, "Meh")) + cache.put("1", 0, "Meh") should (equal (CachePutSuccess(56)) or equal (CachePutSuccess(48))) //we cannot add this to cache (there is not enough space in cache) & we cannot evict the only value from //cache because it's from the same dataset expect(CachePutFailure())(cache.put("1", 1, "Meh")) //should be OK, dataset '1' can be evicted from cache - expect(CachePutSuccess(56))(cache.put("2", 0, "Meh")) + cache.put("2", 0, "Meh") should (equal (CachePutSuccess(56)) or equal (CachePutSuccess(48))) //should fail, cache should obey it's capacity expect(CachePutFailure())(cache.put("3", 0, "Very_long_and_useless_string")) diff --git a/core/src/test/scala/spark/BroadcastSuite.scala b/core/src/test/scala/spark/BroadcastSuite.scala index 0738a2725b..2d3302f0aa 100644 --- a/core/src/test/scala/spark/BroadcastSuite.scala +++ b/core/src/test/scala/spark/BroadcastSuite.scala @@ -12,6 +12,8 @@ class BroadcastSuite extends FunSuite with BeforeAndAfter { sc.stop() sc = null } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } test("basic broadcast") { diff --git a/core/src/test/scala/spark/CacheTrackerSuite.scala b/core/src/test/scala/spark/CacheTrackerSuite.scala index 426c0d26e9..467605981b 100644 --- a/core/src/test/scala/spark/CacheTrackerSuite.scala +++ b/core/src/test/scala/spark/CacheTrackerSuite.scala @@ -22,7 +22,7 @@ class CacheTrackerSuite extends FunSuite { } catch { case e: Exception => throw new SparkException("Error communicating with actor", e) - } + } } test("CacheTrackerActor slave initialization & cache status") { diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala new file mode 100644 index 0000000000..cacc2796b6 --- /dev/null +++ b/core/src/test/scala/spark/DistributedSuite.scala @@ -0,0 +1,191 @@ +package spark + +import org.scalatest.FunSuite +import org.scalatest.BeforeAndAfter +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.prop.Checkers +import org.scalacheck.Arbitrary._ +import org.scalacheck.Gen +import org.scalacheck.Prop._ + +import com.google.common.io.Files + +import scala.collection.mutable.ArrayBuffer + +import SparkContext._ +import storage.StorageLevel + +class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { + + val clusterUrl = "local-cluster[2,1,512]" + + @transient var sc: SparkContext = _ + + after { + if (sc != null) { + sc.stop() + sc = null + } + System.clearProperty("spark.reducer.maxMbInFlight") + System.clearProperty("spark.storage.memoryFraction") + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") + } + + test("local-cluster format") { + sc = new SparkContext("local-cluster[2,1,512]", "test") + assert(sc.parallelize(1 to 2, 2).count() == 2) + sc.stop() + System.clearProperty("spark.master.port") + sc = new SparkContext("local-cluster[2 , 1 , 512]", "test") + assert(sc.parallelize(1 to 2, 2).count() == 2) + sc.stop() + System.clearProperty("spark.master.port") + sc = new SparkContext("local-cluster[2, 1, 512]", "test") + assert(sc.parallelize(1 to 2, 2).count() == 2) + sc.stop() + System.clearProperty("spark.master.port") + sc = new SparkContext("local-cluster[ 2, 1, 512 ]", "test") + assert(sc.parallelize(1 to 2, 2).count() == 2) + sc.stop() + System.clearProperty("spark.master.port") + sc = null + } + + test("simple groupByKey") { + sc = new SparkContext(clusterUrl, "test") + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 5) + val groups = pairs.groupByKey(5).collect() + assert(groups.size === 2) + val valuesFor1 = groups.find(_._1 == 1).get._2 + assert(valuesFor1.toList.sorted === List(1, 2, 3)) + val valuesFor2 = groups.find(_._1 == 2).get._2 + assert(valuesFor2.toList.sorted === List(1)) + } + + test("groupByKey where map output sizes exceed maxMbInFlight") { + System.setProperty("spark.reducer.maxMbInFlight", "1") + sc = new SparkContext(clusterUrl, "test") + // This data should be around 20 MB, so even with 4 mappers and 2 reducers, each map output + // file should be about 2.5 MB + val pairs = sc.parallelize(1 to 2000, 4).map(x => (x % 16, new Array[Byte](10000))) + val groups = pairs.groupByKey(2).map(x => (x._1, x._2.size)).collect() + assert(groups.length === 16) + assert(groups.map(_._2).sum === 2000) + // Note that spark.reducer.maxMbInFlight will be cleared in the test suite's after{} block + } + + test("accumulators") { + sc = new SparkContext(clusterUrl, "test") + val accum = sc.accumulator(0) + sc.parallelize(1 to 10, 10).foreach(x => accum += x) + assert(accum.value === 55) + } + + test("broadcast variables") { + sc = new SparkContext(clusterUrl, "test") + val array = new Array[Int](100) + val bv = sc.broadcast(array) + array(2) = 3 // Change the array -- this should not be seen on workers + val rdd = sc.parallelize(1 to 10, 10) + val sum = rdd.map(x => bv.value.sum).reduce(_ + _) + assert(sum === 0) + } + + test("repeatedly failing task") { + sc = new SparkContext(clusterUrl, "test") + val accum = sc.accumulator(0) + val thrown = intercept[SparkException] { + sc.parallelize(1 to 10, 10).foreach(x => println(x / 0)) + } + assert(thrown.getClass === classOf[SparkException]) + assert(thrown.getMessage.contains("more than 4 times")) + } + + test("caching") { + sc = new SparkContext(clusterUrl, "test") + val data = sc.parallelize(1 to 1000, 10).cache() + assert(data.count() === 1000) + assert(data.count() === 1000) + assert(data.count() === 1000) + } + + test("caching on disk") { + sc = new SparkContext(clusterUrl, "test") + val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.DISK_ONLY) + assert(data.count() === 1000) + assert(data.count() === 1000) + assert(data.count() === 1000) + } + + test("caching in memory, replicated") { + sc = new SparkContext(clusterUrl, "test") + val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_2) + assert(data.count() === 1000) + assert(data.count() === 1000) + assert(data.count() === 1000) + } + + test("caching in memory, serialized, replicated") { + sc = new SparkContext(clusterUrl, "test") + val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_SER_2) + assert(data.count() === 1000) + assert(data.count() === 1000) + assert(data.count() === 1000) + } + + test("caching on disk, replicated") { + sc = new SparkContext(clusterUrl, "test") + val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.DISK_ONLY_2) + assert(data.count() === 1000) + assert(data.count() === 1000) + assert(data.count() === 1000) + } + + test("caching in memory and disk, replicated") { + sc = new SparkContext(clusterUrl, "test") + val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_AND_DISK_2) + assert(data.count() === 1000) + assert(data.count() === 1000) + assert(data.count() === 1000) + } + + test("caching in memory and disk, serialized, replicated") { + sc = new SparkContext(clusterUrl, "test") + val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_AND_DISK_SER_2) + assert(data.count() === 1000) + assert(data.count() === 1000) + assert(data.count() === 1000) + } + + test("compute without caching when no partitions fit in memory") { + System.setProperty("spark.storage.memoryFraction", "0.0001") + sc = new SparkContext(clusterUrl, "test") + // data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache + // to only 50 KB (0.0001 of 512 MB), so no partitions should fit in memory + val data = sc.parallelize(1 to 4000000, 2).persist(StorageLevel.MEMORY_ONLY_SER) + assert(data.count() === 4000000) + assert(data.count() === 4000000) + assert(data.count() === 4000000) + System.clearProperty("spark.storage.memoryFraction") + } + + test("compute when only some partitions fit in memory") { + System.setProperty("spark.storage.memoryFraction", "0.01") + sc = new SparkContext(clusterUrl, "test") + // data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache + // to only 5 MB (0.01 of 512 MB), so not all of it will fit in memory; we use 20 partitions + // to make sure that *some* of them do fit though + val data = sc.parallelize(1 to 4000000, 20).persist(StorageLevel.MEMORY_ONLY_SER) + assert(data.count() === 4000000) + assert(data.count() === 4000000) + assert(data.count() === 4000000) + System.clearProperty("spark.storage.memoryFraction") + } + + test("passing environment variables to cluster") { + sc = new SparkContext(clusterUrl, "test", null, Nil, Map("TEST_VAR" -> "TEST_VALUE")) + val values = sc.parallelize(1 to 2, 2).map(x => System.getenv("TEST_VAR")).collect() + assert(values.toSeq === Seq("TEST_VALUE", "TEST_VALUE")) + } +} diff --git a/core/src/test/scala/spark/FailureSuite.scala b/core/src/test/scala/spark/FailureSuite.scala index 4405829161..a3454f25f6 100644 --- a/core/src/test/scala/spark/FailureSuite.scala +++ b/core/src/test/scala/spark/FailureSuite.scala @@ -32,6 +32,8 @@ class FailureSuite extends FunSuite with BeforeAndAfter { sc.stop() sc = null } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } // Run a 3-task map job in which task 1 deterministically fails once, and check diff --git a/core/src/test/scala/spark/FileServerSuite.scala b/core/src/test/scala/spark/FileServerSuite.scala new file mode 100644 index 0000000000..b4283d9604 --- /dev/null +++ b/core/src/test/scala/spark/FileServerSuite.scala @@ -0,0 +1,110 @@ +package spark + +import com.google.common.io.Files +import org.scalatest.FunSuite +import org.scalatest.BeforeAndAfter +import java.io.{File, PrintWriter, FileReader, BufferedReader} +import SparkContext._ + +class FileServerSuite extends FunSuite with BeforeAndAfter { + + @transient var sc: SparkContext = _ + @transient var tmpFile : File = _ + @transient var testJarFile : File = _ + + before { + // Create a sample text file + val tmpdir = new File(Files.createTempDir(), "test") + tmpdir.mkdir() + tmpFile = new File(tmpdir, "FileServerSuite.txt") + val pw = new PrintWriter(tmpFile) + pw.println("100") + pw.close() + } + + after { + if (sc != null) { + sc.stop() + sc = null + } + // Clean up downloaded file + if (tmpFile.exists) { + tmpFile.delete() + } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") + } + + test("Distributing files locally") { + sc = new SparkContext("local[4]", "test") + sc.addFile(tmpFile.toString) + val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0)) + val result = sc.parallelize(testData).reduceByKey { + val in = new BufferedReader(new FileReader("FileServerSuite.txt")) + val fileVal = in.readLine().toInt + in.close() + _ * fileVal + _ * fileVal + }.collect + assert(result.toSet === Set((1,200), (2,300), (3,500))) + } + + test("Distributing files locally using URL as input") { + // addFile("file:///....") + sc = new SparkContext("local[4]", "test") + sc.addFile((new File(tmpFile.toString)).toURL.toString) + val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0)) + val result = sc.parallelize(testData).reduceByKey { + val in = new BufferedReader(new FileReader("FileServerSuite.txt")) + val fileVal = in.readLine().toInt + in.close() + _ * fileVal + _ * fileVal + }.collect + assert(result.toSet === Set((1,200), (2,300), (3,500))) + } + + test ("Dynamically adding JARS locally") { + sc = new SparkContext("local[4]", "test") + val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile() + sc.addJar(sampleJarFile) + val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0)) + val result = sc.parallelize(testData).reduceByKey { (x,y) => + val fac = Thread.currentThread.getContextClassLoader() + .loadClass("org.uncommons.maths.Maths") + .getDeclaredMethod("factorial", classOf[Int]) + val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt + val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt + a + b + }.collect() + assert(result.toSet === Set((1,2), (2,7), (3,121))) + } + + test("Distributing files on a standalone cluster") { + sc = new SparkContext("local-cluster[1,1,512]", "test") + sc.addFile(tmpFile.toString) + val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0)) + val result = sc.parallelize(testData).reduceByKey { + val in = new BufferedReader(new FileReader("FileServerSuite.txt")) + val fileVal = in.readLine().toInt + in.close() + _ * fileVal + _ * fileVal + }.collect + println(result) + assert(result.toSet === Set((1,200), (2,300), (3,500))) + } + + test ("Dynamically adding JARS on a standalone cluster") { + sc = new SparkContext("local-cluster[1,1,512]", "test") + val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile() + sc.addJar(sampleJarFile) + val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0)) + val result = sc.parallelize(testData).reduceByKey { (x,y) => + val fac = Thread.currentThread.getContextClassLoader() + .loadClass("org.uncommons.maths.Maths") + .getDeclaredMethod("factorial", classOf[Int]) + val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt + val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt + a + b + }.collect() + assert(result.toSet === Set((1,2), (2,7), (3,121))) + } +} diff --git a/core/src/test/scala/spark/FileSuite.scala b/core/src/test/scala/spark/FileSuite.scala index 17c7a8de43..554bea53a9 100644 --- a/core/src/test/scala/spark/FileSuite.scala +++ b/core/src/test/scala/spark/FileSuite.scala @@ -1,6 +1,6 @@ package spark -import java.io.File +import java.io.{FileWriter, PrintWriter, File} import scala.io.Source @@ -20,6 +20,8 @@ class FileSuite extends FunSuite with BeforeAndAfter { sc.stop() sc = null } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } test("text files") { @@ -142,4 +144,18 @@ class FileSuite extends FunSuite with BeforeAndAfter { sc.newAPIHadoopFile[IntWritable, Text, SequenceFileInputFormat[IntWritable, Text]](outputDir) assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)")) } + + test("file caching") { + sc = new SparkContext("local", "test") + val tempDir = Files.createTempDir() + val out = new FileWriter(tempDir + "/input") + out.write("Hello world!\n") + out.write("What's up?\n") + out.write("Goodbye\n") + out.close() + val rdd = sc.textFile(tempDir + "/input").cache() + assert(rdd.count() === 3) + assert(rdd.count() === 3) + assert(rdd.count() === 3) + } } diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java index 24bf021710..5875506179 100644 --- a/core/src/test/scala/spark/JavaAPISuite.java +++ b/core/src/test/scala/spark/JavaAPISuite.java @@ -112,15 +112,19 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(new Tuple2<Integer, Integer>(3, 2), sortedPairs.get(2)); } + static int foreachCalls = 0; + @Test public void foreach() { + foreachCalls = 0; JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World")); rdd.foreach(new VoidFunction<String>() { @Override public void call(String s) { - System.out.println(s); + foreachCalls++; } }); + Assert.assertEquals(2, foreachCalls); } @Test diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala new file mode 100644 index 0000000000..4e9717d871 --- /dev/null +++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala @@ -0,0 +1,25 @@ +package spark + +import org.scalatest.FunSuite + +class MapOutputTrackerSuite extends FunSuite { + test("compressSize") { + assert(MapOutputTracker.compressSize(0L) === 0) + assert(MapOutputTracker.compressSize(1L) === 0) + assert(MapOutputTracker.compressSize(2L) === 8) + assert(MapOutputTracker.compressSize(10L) === 25) + assert((MapOutputTracker.compressSize(1000000L) & 0xFF) === 145) + assert((MapOutputTracker.compressSize(1000000000L) & 0xFF) === 218) + // This last size is bigger than we can encode in a byte, so check that we just return 255 + assert((MapOutputTracker.compressSize(1000000000000000000L) & 0xFF) === 255) + } + + test("decompressSize") { + assert(MapOutputTracker.decompressSize(0) === 1) + for (size <- Seq(2L, 10L, 100L, 50000L, 1000000L, 1000000000L)) { + val size2 = MapOutputTracker.decompressSize(MapOutputTracker.compressSize(size)) + assert(size2 >= 0.99 * size && size2 <= 1.11 * size, + "size " + size + " decompressed to " + size2 + ", which is out of range") + } + } +} diff --git a/core/src/test/scala/spark/PartitioningSuite.scala b/core/src/test/scala/spark/PartitioningSuite.scala index 5000fa7307..3dadc7acec 100644 --- a/core/src/test/scala/spark/PartitioningSuite.scala +++ b/core/src/test/scala/spark/PartitioningSuite.scala @@ -16,6 +16,8 @@ class PartitioningSuite extends FunSuite with BeforeAndAfter { sc.stop() sc = null } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } diff --git a/core/src/test/scala/spark/PipedRDDSuite.scala b/core/src/test/scala/spark/PipedRDDSuite.scala index 426652dc15..9b84b29227 100644 --- a/core/src/test/scala/spark/PipedRDDSuite.scala +++ b/core/src/test/scala/spark/PipedRDDSuite.scala @@ -13,6 +13,8 @@ class PipedRDDSuite extends FunSuite with BeforeAndAfter { sc.stop() sc = null } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } test("basic pipe") { @@ -22,7 +24,6 @@ class PipedRDDSuite extends FunSuite with BeforeAndAfter { val piped = nums.pipe(Seq("cat")) val c = piped.collect() - println(c.toSeq) assert(c.size === 4) assert(c(0) === "1") assert(c(1) === "2") diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index ba9b36adb7..37a0ff0947 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -3,6 +3,8 @@ package spark import scala.collection.mutable.HashMap import org.scalatest.FunSuite import org.scalatest.BeforeAndAfter + +import spark.rdd.CoalescedRDD import SparkContext._ class RDDSuite extends FunSuite with BeforeAndAfter { @@ -14,6 +16,8 @@ class RDDSuite extends FunSuite with BeforeAndAfter { sc.stop() sc = null } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } test("basic operations") { @@ -29,6 +33,11 @@ class RDDSuite extends FunSuite with BeforeAndAfter { assert(nums.glom().map(_.toList).collect().toList === List(List(1, 2), List(3, 4))) val partitionSums = nums.mapPartitions(iter => Iterator(iter.reduceLeft(_ + _))) assert(partitionSums.collect().toList === List(3, 7)) + + val partitionSumsWithSplit = nums.mapPartitionsWithSplit { + case(split, iter) => Iterator((split, iter.reduceLeft(_ + _))) + } + assert(partitionSumsWithSplit.collect().toList === List((0, 3), (1, 7))) } test("SparkContext.union") { @@ -66,4 +75,43 @@ class RDDSuite extends FunSuite with BeforeAndAfter { val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).flatMap(x => 1 to x).checkpoint() assert(rdd.collect().toList === List(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)) } + + test("basic caching") { + sc = new SparkContext("local", "test") + val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache() + assert(rdd.collect().toList === List(1, 2, 3, 4)) + assert(rdd.collect().toList === List(1, 2, 3, 4)) + assert(rdd.collect().toList === List(1, 2, 3, 4)) + } + + test("coalesced RDDs") { + sc = new SparkContext("local", "test") + val data = sc.parallelize(1 to 10, 10) + + val coalesced1 = new CoalescedRDD(data, 2) + assert(coalesced1.collect().toList === (1 to 10).toList) + assert(coalesced1.glom().collect().map(_.toList).toList === + List(List(1, 2, 3, 4, 5), List(6, 7, 8, 9, 10))) + + // Check that the narrow dependency is also specified correctly + assert(coalesced1.dependencies.head.getParents(0).toList === List(0, 1, 2, 3, 4)) + assert(coalesced1.dependencies.head.getParents(1).toList === List(5, 6, 7, 8, 9)) + + val coalesced2 = new CoalescedRDD(data, 3) + assert(coalesced2.collect().toList === (1 to 10).toList) + assert(coalesced2.glom().collect().map(_.toList).toList === + List(List(1, 2, 3), List(4, 5, 6), List(7, 8, 9, 10))) + + val coalesced3 = new CoalescedRDD(data, 10) + assert(coalesced3.collect().toList === (1 to 10).toList) + assert(coalesced3.glom().collect().map(_.toList).toList === + (1 to 10).map(x => List(x)).toList) + + // If we try to coalesce into more partitions than the original RDD, it should just + // keep the original number of partitions. + val coalesced4 = new CoalescedRDD(data, 20) + assert(coalesced4.collect().toList === (1 to 10).toList) + assert(coalesced4.glom().collect().map(_.toList).toList === + (1 to 10).map(x => List(x)).toList) + } } diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index 99d13b31ef..8170100f1d 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -1,7 +1,10 @@ package spark +import scala.collection.mutable.ArrayBuffer + import org.scalatest.FunSuite import org.scalatest.BeforeAndAfter +import org.scalatest.matchers.ShouldMatchers import org.scalatest.prop.Checkers import org.scalacheck.Arbitrary._ import org.scalacheck.Gen @@ -9,21 +12,22 @@ import org.scalacheck.Prop._ import com.google.common.io.Files -import scala.collection.mutable.ArrayBuffer +import spark.rdd.ShuffledRDD +import spark.SparkContext._ -import SparkContext._ +class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { -class ShuffleSuite extends FunSuite with BeforeAndAfter { - var sc: SparkContext = _ - + after { if (sc != null) { sc.stop() sc = null } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } - + test("groupByKey") { sc = new SparkContext("local", "test") val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))) @@ -56,7 +60,7 @@ class ShuffleSuite extends FunSuite with BeforeAndAfter { val valuesFor2 = groups.find(_._1 == 2).get._2 assert(valuesFor2.toList.sorted === List(1)) } - + test("groupByKey with many output partitions") { sc = new SparkContext("local", "test") val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))) @@ -68,6 +72,22 @@ class ShuffleSuite extends FunSuite with BeforeAndAfter { assert(valuesFor2.toList.sorted === List(1)) } + test("groupByKey with compression") { + try { + System.setProperty("spark.blockManager.compress", "true") + sc = new SparkContext("local", "test") + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4) + val groups = pairs.groupByKey(4).collect() + assert(groups.size === 2) + val valuesFor1 = groups.find(_._1 == 1).get._2 + assert(valuesFor1.toList.sorted === List(1, 2, 3)) + val valuesFor2 = groups.find(_._1 == 2).get._2 + assert(valuesFor2.toList.sorted === List(1)) + } finally { + System.setProperty("spark.blockManager.compress", "false") + } + } + test("reduceByKey") { sc = new SparkContext("local", "test") val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) @@ -186,7 +206,7 @@ class ShuffleSuite extends FunSuite with BeforeAndAfter { (4, (ArrayBuffer(), ArrayBuffer('w'))) )) } - + test("zero-partition RDD") { sc = new SparkContext("local", "test") val emptyDir = Files.createTempDir() @@ -194,6 +214,13 @@ class ShuffleSuite extends FunSuite with BeforeAndAfter { assert(file.splits.size == 0) assert(file.collect().toList === Nil) // Test that a shuffle on the file works, because this used to be a bug - assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil) + assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil) + } +} + +object ShuffleSuite { + def mergeCombineException(x: Int, y: Int): Int = { + throw new SparkException("Exception for map-side combine.") + x + y } } diff --git a/core/src/test/scala/spark/SizeEstimatorSuite.scala b/core/src/test/scala/spark/SizeEstimatorSuite.scala index a2015644ee..17f366212b 100644 --- a/core/src/test/scala/spark/SizeEstimatorSuite.scala +++ b/core/src/test/scala/spark/SizeEstimatorSuite.scala @@ -3,6 +3,7 @@ package spark import org.scalatest.FunSuite import org.scalatest.BeforeAndAfterAll import org.scalatest.PrivateMethodTester +import org.scalatest.matchers.ShouldMatchers class DummyClass1 {} @@ -19,7 +20,9 @@ class DummyClass4(val d: DummyClass3) { val x: Int = 0 } -class SizeEstimatorSuite extends FunSuite with BeforeAndAfterAll with PrivateMethodTester { +class SizeEstimatorSuite + extends FunSuite with BeforeAndAfterAll with PrivateMethodTester with ShouldMatchers { + var oldArch: String = _ var oldOops: String = _ @@ -42,11 +45,15 @@ class SizeEstimatorSuite extends FunSuite with BeforeAndAfterAll with PrivateMet expect(48)(SizeEstimator.estimate(new DummyClass4(new DummyClass3))) } + // NOTE: The String class definition changed in JDK 7 to exclude the int fields count and length. + // This means that the size of strings will be lesser by 8 bytes in JDK 7 compared to JDK 6. + // http://mail.openjdk.java.net/pipermail/core-libs-dev/2012-May/010257.html + // Work around to check for either. test("strings") { - expect(48)(SizeEstimator.estimate("")) - expect(56)(SizeEstimator.estimate("a")) - expect(56)(SizeEstimator.estimate("ab")) - expect(64)(SizeEstimator.estimate("abcdefgh")) + SizeEstimator.estimate("") should (equal (48) or equal (40)) + SizeEstimator.estimate("a") should (equal (56) or equal (48)) + SizeEstimator.estimate("ab") should (equal (56) or equal (48)) + SizeEstimator.estimate("abcdefgh") should (equal(64) or equal(56)) } test("primitive arrays") { @@ -106,6 +113,10 @@ class SizeEstimatorSuite extends FunSuite with BeforeAndAfterAll with PrivateMet resetOrClear("os.arch", arch) } + // NOTE: The String class definition changed in JDK 7 to exclude the int fields count and length. + // This means that the size of strings will be lesser by 8 bytes in JDK 7 compared to JDK 6. + // http://mail.openjdk.java.net/pipermail/core-libs-dev/2012-May/010257.html + // Work around to check for either. test("64-bit arch with no compressed oops") { val arch = System.setProperty("os.arch", "amd64") val oops = System.setProperty("spark.test.useCompressedOops", "false") @@ -113,10 +124,10 @@ class SizeEstimatorSuite extends FunSuite with BeforeAndAfterAll with PrivateMet val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() - expect(64)(SizeEstimator.estimate("")) - expect(72)(SizeEstimator.estimate("a")) - expect(72)(SizeEstimator.estimate("ab")) - expect(80)(SizeEstimator.estimate("abcdefgh")) + SizeEstimator.estimate("") should (equal (64) or equal (56)) + SizeEstimator.estimate("a") should (equal (72) or equal (64)) + SizeEstimator.estimate("ab") should (equal (72) or equal (64)) + SizeEstimator.estimate("abcdefgh") should (equal (80) or equal (72)) resetOrClear("os.arch", arch) resetOrClear("spark.test.useCompressedOops", oops) diff --git a/core/src/test/scala/spark/SortingSuite.scala b/core/src/test/scala/spark/SortingSuite.scala index 8fa1442a4d..1ad11ff4c3 100644 --- a/core/src/test/scala/spark/SortingSuite.scala +++ b/core/src/test/scala/spark/SortingSuite.scala @@ -12,12 +12,15 @@ class SortingSuite extends FunSuite with BeforeAndAfter with ShouldMatchers with after { if (sc != null) { sc.stop() + sc = null } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } test("sortByKey") { sc = new SparkContext("local", "test") - val pairs = sc.parallelize(Array((1, 0), (2, 0), (0, 0), (3, 0))) + val pairs = sc.parallelize(Array((1, 0), (2, 0), (0, 0), (3, 0)), 2) assert(pairs.sortByKey().collect() === Array((0,0), (1,0), (2,0), (3,0))) } @@ -25,18 +28,56 @@ class SortingSuite extends FunSuite with BeforeAndAfter with ShouldMatchers with sc = new SparkContext("local", "test") val rand = new scala.util.Random() val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) } - val pairs = sc.parallelize(pairArr) - assert(pairs.sortByKey().collect() === pairArr.sortBy(_._1)) + val pairs = sc.parallelize(pairArr, 2) + val sorted = pairs.sortByKey() + assert(sorted.splits.size === 2) + assert(sorted.collect() === pairArr.sortBy(_._1)) } + test("large array with one split") { + sc = new SparkContext("local", "test") + val rand = new scala.util.Random() + val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) } + val pairs = sc.parallelize(pairArr, 2) + val sorted = pairs.sortByKey(true, 1) + assert(sorted.splits.size === 1) + assert(sorted.collect() === pairArr.sortBy(_._1)) + } + + test("large array with many splits") { + sc = new SparkContext("local", "test") + val rand = new scala.util.Random() + val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) } + val pairs = sc.parallelize(pairArr, 2) + val sorted = pairs.sortByKey(true, 20) + assert(sorted.splits.size === 20) + assert(sorted.collect() === pairArr.sortBy(_._1)) + } + test("sort descending") { sc = new SparkContext("local", "test") val rand = new scala.util.Random() val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) } - val pairs = sc.parallelize(pairArr) + val pairs = sc.parallelize(pairArr, 2) assert(pairs.sortByKey(false).collect() === pairArr.sortWith((x, y) => x._1 > y._1)) } + test("sort descending with one split") { + sc = new SparkContext("local", "test") + val rand = new scala.util.Random() + val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) } + val pairs = sc.parallelize(pairArr, 1) + assert(pairs.sortByKey(false, 1).collect() === pairArr.sortWith((x, y) => x._1 > y._1)) + } + + test("sort descending with many splits") { + sc = new SparkContext("local", "test") + val rand = new scala.util.Random() + val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) } + val pairs = sc.parallelize(pairArr, 2) + assert(pairs.sortByKey(false, 20).collect() === pairArr.sortWith((x, y) => x._1 > y._1)) + } + test("more partitions than elements") { sc = new SparkContext("local", "test") val rand = new scala.util.Random() @@ -48,7 +89,7 @@ class SortingSuite extends FunSuite with BeforeAndAfter with ShouldMatchers with test("empty RDD") { sc = new SparkContext("local", "test") val pairArr = new Array[(Int, Int)](0) - val pairs = sc.parallelize(pairArr) + val pairs = sc.parallelize(pairArr, 2) assert(pairs.sortByKey().collect() === pairArr.sortBy(_._1)) } @@ -58,11 +99,11 @@ class SortingSuite extends FunSuite with BeforeAndAfter with ShouldMatchers with val sorted = sc.parallelize(pairArr, 4).sortByKey() assert(sorted.collect() === pairArr.sortBy(_._1)) val partitions = sorted.collectPartitions() - logInfo("partition lengths: " + partitions.map(_.length).mkString(", ")) - partitions(0).length should be > 200 - partitions(1).length should be > 200 - partitions(2).length should be > 200 - partitions(3).length should be > 200 + logInfo("Partition lengths: " + partitions.map(_.length).mkString(", ")) + partitions(0).length should be > 180 + partitions(1).length should be > 180 + partitions(2).length should be > 180 + partitions(3).length should be > 180 partitions(0).last should be < partitions(1).head partitions(1).last should be < partitions(2).head partitions(2).last should be < partitions(3).head @@ -75,10 +116,10 @@ class SortingSuite extends FunSuite with BeforeAndAfter with ShouldMatchers with assert(sorted.collect() === pairArr.sortBy(_._1).reverse) val partitions = sorted.collectPartitions() logInfo("partition lengths: " + partitions.map(_.length).mkString(", ")) - partitions(0).length should be > 200 - partitions(1).length should be > 200 - partitions(2).length should be > 200 - partitions(3).length should be > 200 + partitions(0).length should be > 180 + partitions(1).length should be > 180 + partitions(2).length should be > 180 + partitions(3).length should be > 180 partitions(0).last should be > partitions(1).head partitions(1).last should be > partitions(2).head partitions(2).last should be > partitions(3).head diff --git a/core/src/test/scala/spark/ThreadingSuite.scala b/core/src/test/scala/spark/ThreadingSuite.scala index 302f731187..e9b1837d89 100644 --- a/core/src/test/scala/spark/ThreadingSuite.scala +++ b/core/src/test/scala/spark/ThreadingSuite.scala @@ -31,6 +31,8 @@ class ThreadingSuite extends FunSuite with BeforeAndAfter { sc.stop() sc = null } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index f3f891e471..b9c19e61cd 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -13,10 +13,14 @@ import spark.SizeEstimator import spark.util.ByteBufferInputStream class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester { + var store: BlockManager = null var actorSystem: ActorSystem = null var master: BlockManagerMaster = null - var oldArch: String = _ - var oldOops: String = _ + var oldArch: String = null + var oldOops: String = null + + // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test + val serializer = new KryoSerializer before { actorSystem = ActorSystem("test") @@ -30,6 +34,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } after { + if (store != null) { + store.stop() + } actorSystem.shutdown() actorSystem.awaitTermination() actorSystem = null @@ -49,15 +56,15 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("manager-master interaction") { - val store = new BlockManager(master, new KryoSerializer, 2000) + store = new BlockManager(master, serializer, 2000) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) // Putting a1, a2 and a3 in memory and telling master only about a1 and a2 - store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_DESER) - store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_DESER) - store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY_DESER, false) + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) + store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY) + store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, false) // Checking whether blocks are in memory assert(store.getSingle("a1") != None, "a1 was not in store") @@ -69,9 +76,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(master.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2") assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3") - // Setting storage level of a1 and a2 to invalid; they should be removed from store and master - store.setLevel("a1", new StorageLevel(false, false, false, 1)) - store.setLevel("a2", new StorageLevel(true, false, false, 0)) + // Drop a1 and a2 from memory; this should be reported back to the master + store.dropFromMemory("a1", null) + store.dropFromMemory("a2", null) assert(store.getSingle("a1") === None, "a1 not removed from store") assert(store.getSingle("a2") === None, "a2 not removed from store") assert(master.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1") @@ -79,97 +86,167 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU storage") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) - store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_DESER) - store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_DESER) - store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY_DESER) + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) + store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY) + store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY) assert(store.getSingle("a2") != None, "a2 was not in store") assert(store.getSingle("a3") != None, "a3 was not in store") - Thread.sleep(100) assert(store.getSingle("a1") === None, "a1 was in store") assert(store.getSingle("a2") != None, "a2 was not in store") // At this point a2 was gotten last, so LRU will getSingle rid of a3 - store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_DESER) + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) assert(store.getSingle("a1") != None, "a1 was not in store") assert(store.getSingle("a2") != None, "a2 was not in store") - Thread.sleep(100) assert(store.getSingle("a3") === None, "a3 was in store") } test("in-memory LRU storage with serialization") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) - store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) - store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY) - store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY) - Thread.sleep(100) + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER) + store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER) + store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY_SER) assert(store.getSingle("a2") != None, "a2 was not in store") assert(store.getSingle("a3") != None, "a3 was not in store") assert(store.getSingle("a1") === None, "a1 was in store") assert(store.getSingle("a2") != None, "a2 was not in store") // At this point a2 was gotten last, so LRU will getSingle rid of a3 - store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_DESER) - Thread.sleep(100) + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER) assert(store.getSingle("a1") != None, "a1 was not in store") assert(store.getSingle("a2") != None, "a2 was not in store") - assert(store.getSingle("a3") === None, "a1 was in store") + assert(store.getSingle("a3") === None, "a3 was in store") } - + + test("in-memory LRU for partitions of same RDD") { + store = new BlockManager(master, serializer, 1200) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + store.putSingle("rdd_0_1", a1, StorageLevel.MEMORY_ONLY) + store.putSingle("rdd_0_2", a2, StorageLevel.MEMORY_ONLY) + store.putSingle("rdd_0_3", a3, StorageLevel.MEMORY_ONLY) + // Even though we accessed rdd_0_3 last, it should not have replaced partitiosn 1 and 2 + // from the same RDD + assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store") + assert(store.getSingle("rdd_0_2") != None, "rdd_0_2 was not in store") + assert(store.getSingle("rdd_0_1") != None, "rdd_0_1 was not in store") + // Check that rdd_0_3 doesn't replace them even after further accesses + assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store") + assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store") + assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store") + } + + test("in-memory LRU for partitions of multiple RDDs") { + store = new BlockManager(master, serializer, 1200) + store.putSingle("rdd_0_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY) + store.putSingle("rdd_0_2", new Array[Byte](400), StorageLevel.MEMORY_ONLY) + store.putSingle("rdd_1_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY) + // At this point rdd_1_1 should've replaced rdd_0_1 + assert(store.memoryStore.contains("rdd_1_1"), "rdd_1_1 was not in store") + assert(!store.memoryStore.contains("rdd_0_1"), "rdd_0_1 was in store") + assert(store.memoryStore.contains("rdd_0_2"), "rdd_0_2 was not in store") + // Do a get() on rdd_0_2 so that it is the most recently used item + assert(store.getSingle("rdd_0_2") != None, "rdd_0_2 was not in store") + // Put in more partitions from RDD 0; they should replace rdd_1_1 + store.putSingle("rdd_0_3", new Array[Byte](400), StorageLevel.MEMORY_ONLY) + store.putSingle("rdd_0_4", new Array[Byte](400), StorageLevel.MEMORY_ONLY) + // Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped + // when we try to add rdd_0_4. + assert(!store.memoryStore.contains("rdd_1_1"), "rdd_1_1 was in store") + assert(!store.memoryStore.contains("rdd_0_1"), "rdd_0_1 was in store") + assert(!store.memoryStore.contains("rdd_0_4"), "rdd_0_4 was in store") + assert(store.memoryStore.contains("rdd_0_2"), "rdd_0_2 was not in store") + assert(store.memoryStore.contains("rdd_0_3"), "rdd_0_3 was not in store") + } + test("on-disk storage") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) store.putSingle("a1", a1, StorageLevel.DISK_ONLY) store.putSingle("a2", a2, StorageLevel.DISK_ONLY) store.putSingle("a3", a3, StorageLevel.DISK_ONLY) - assert(store.getSingle("a2") != None, "a2 was not in store") - assert(store.getSingle("a3") != None, "a3 was not in store") - assert(store.getSingle("a1") != None, "a1 was not in store") + assert(store.getSingle("a2") != None, "a2 was in store") + assert(store.getSingle("a3") != None, "a3 was in store") + assert(store.getSingle("a1") != None, "a1 was in store") } test("disk and memory storage") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) - store.putSingle("a1", a1, StorageLevel.DISK_AND_MEMORY_DESER) - store.putSingle("a2", a2, StorageLevel.DISK_AND_MEMORY_DESER) - store.putSingle("a3", a3, StorageLevel.DISK_AND_MEMORY_DESER) - Thread.sleep(100) + store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK) + store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK) + store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK) assert(store.getSingle("a2") != None, "a2 was not in store") assert(store.getSingle("a3") != None, "a3 was not in store") + assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store") assert(store.getSingle("a1") != None, "a1 was not in store") + assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store") + } + + test("disk and memory storage with getLocalBytes") { + store = new BlockManager(master, serializer, 1200) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK) + store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK) + store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK) + assert(store.getLocalBytes("a2") != None, "a2 was not in store") + assert(store.getLocalBytes("a3") != None, "a3 was not in store") + assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store") + assert(store.getLocalBytes("a1") != None, "a1 was not in store") + assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store") } test("disk and memory storage with serialization") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) - store.putSingle("a1", a1, StorageLevel.DISK_AND_MEMORY) - store.putSingle("a2", a2, StorageLevel.DISK_AND_MEMORY) - store.putSingle("a3", a3, StorageLevel.DISK_AND_MEMORY) - Thread.sleep(100) + store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER) + store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER) + store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER) assert(store.getSingle("a2") != None, "a2 was not in store") assert(store.getSingle("a3") != None, "a3 was not in store") + assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store") assert(store.getSingle("a1") != None, "a1 was not in store") + assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store") + } + + test("disk and memory storage with serialization and getLocalBytes") { + store = new BlockManager(master, serializer, 1200) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER) + store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER) + store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER) + assert(store.getLocalBytes("a2") != None, "a2 was not in store") + assert(store.getLocalBytes("a3") != None, "a3 was not in store") + assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store") + assert(store.getLocalBytes("a1") != None, "a1 was not in store") + assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store") } test("LRU with mixed storage levels") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) val a4 = new Array[Byte](400) // First store a1 and a2, both in memory, and a3, on disk only - store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) - store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY) + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER) + store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER) store.putSingle("a3", a3, StorageLevel.DISK_ONLY) // At this point LRU should not kick in because a3 is only on disk assert(store.getSingle("a1") != None, "a2 was not in store") @@ -179,8 +256,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.getSingle("a2") != None, "a3 was not in store") assert(store.getSingle("a3") != None, "a1 was not in store") // Now let's add in a4, which uses both disk and memory; a1 should drop out - store.putSingle("a4", a4, StorageLevel.DISK_AND_MEMORY) - Thread.sleep(100) + store.putSingle("a4", a4, StorageLevel.MEMORY_AND_DISK_SER) assert(store.getSingle("a1") == None, "a1 was in store") assert(store.getSingle("a2") != None, "a2 was not in store") assert(store.getSingle("a3") != None, "a3 was not in store") @@ -188,14 +264,13 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU with streams") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val list1 = List(new Array[Byte](200), new Array[Byte](200)) val list2 = List(new Array[Byte](200), new Array[Byte](200)) val list3 = List(new Array[Byte](200), new Array[Byte](200)) - store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_DESER) - store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_DESER) - store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY_DESER) - Thread.sleep(100) + store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, true) + store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, true) + store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, true) assert(store.get("list2") != None, "list2 was not in store") assert(store.get("list2").get.size == 2) assert(store.get("list3") != None, "list3 was not in store") @@ -204,8 +279,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.get("list2") != None, "list2 was not in store") assert(store.get("list2").get.size == 2) // At this point list2 was gotten last, so LRU will getSingle rid of list3 - store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_DESER) - Thread.sleep(100) + store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, true) assert(store.get("list1") != None, "list1 was not in store") assert(store.get("list1").get.size == 2) assert(store.get("list2") != None, "list2 was not in store") @@ -214,16 +288,15 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("LRU with mixed storage levels and streams") { - val store = new BlockManager(master, new KryoSerializer, 1200) + store = new BlockManager(master, serializer, 1200) val list1 = List(new Array[Byte](200), new Array[Byte](200)) val list2 = List(new Array[Byte](200), new Array[Byte](200)) val list3 = List(new Array[Byte](200), new Array[Byte](200)) val list4 = List(new Array[Byte](200), new Array[Byte](200)) // First store list1 and list2, both in memory, and list3, on disk only - store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY) - store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY) - store.put("list3", list3.iterator, StorageLevel.DISK_ONLY) - Thread.sleep(100) + store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, true) + store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, true) + store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, true) // At this point LRU should not kick in because list3 is only on disk assert(store.get("list1") != None, "list2 was not in store") assert(store.get("list1").get.size === 2) @@ -238,8 +311,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.get("list3") != None, "list1 was not in store") assert(store.get("list3").get.size === 2) // Now let's add in list4, which uses both disk and memory; list1 should drop out - store.put("list4", list4.iterator, StorageLevel.DISK_AND_MEMORY) - Thread.sleep(100) + store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, true) assert(store.get("list1") === None, "list1 was in store") assert(store.get("list2") != None, "list3 was not in store") assert(store.get("list2").get.size === 2) @@ -260,4 +332,70 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(stream.read() === -1, "end of stream not signalled") assert(stream.read(temp, 0, temp.length) === -1, "end of stream not signalled") } + + test("overly large block") { + store = new BlockManager(master, serializer, 500) + store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY) + assert(store.getSingle("a1") === None, "a1 was in store") + store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK) + assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store") + assert(store.getSingle("a2") != None, "a2 was not in store") + } + + test("block compression") { + try { + System.setProperty("spark.shuffle.compress", "true") + store = new BlockManager(master, serializer, 2000) + store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize("shuffle_0_0_0") <= 100, "shuffle_0_0_0 was not compressed") + store.stop() + store = null + + System.setProperty("spark.shuffle.compress", "false") + store = new BlockManager(master, serializer, 2000) + store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize("shuffle_0_0_0") >= 1000, "shuffle_0_0_0 was compressed") + store.stop() + store = null + + System.setProperty("spark.broadcast.compress", "true") + store = new BlockManager(master, serializer, 2000) + store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize("broadcast_0") <= 100, "broadcast_0 was not compressed") + store.stop() + store = null + + System.setProperty("spark.broadcast.compress", "false") + store = new BlockManager(master, serializer, 2000) + store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize("broadcast_0") >= 1000, "broadcast_0 was compressed") + store.stop() + store = null + + System.setProperty("spark.rdd.compress", "true") + store = new BlockManager(master, serializer, 2000) + store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize("rdd_0_0") <= 100, "rdd_0_0 was not compressed") + store.stop() + store = null + + System.setProperty("spark.rdd.compress", "false") + store = new BlockManager(master, serializer, 2000) + store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize("rdd_0_0") >= 1000, "rdd_0_0 was compressed") + store.stop() + store = null + + // Check that any other block types are also kept uncompressed + store = new BlockManager(master, serializer, 2000) + store.putSingle("other_block", new Array[Byte](1000), StorageLevel.MEMORY_ONLY) + assert(store.memoryStore.getSize("other_block") >= 1000, "other_block was compressed") + store.stop() + store = null + } finally { + System.clearProperty("spark.shuffle.compress") + System.clearProperty("spark.broadcast.compress") + System.clearProperty("spark.rdd.compress") + } + } } |