aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/resources/log4j.properties10
-rw-r--r--core/src/test/resources/uncommons-maths-1.2.2.jarbin0 -> 49019 bytes
-rw-r--r--core/src/test/scala/spark/AccumulatorSuite.scala54
-rw-r--r--core/src/test/scala/spark/BoundedMemoryCacheSuite.scala14
-rw-r--r--core/src/test/scala/spark/BroadcastSuite.scala2
-rw-r--r--core/src/test/scala/spark/CacheTrackerSuite.scala2
-rw-r--r--core/src/test/scala/spark/DistributedSuite.scala191
-rw-r--r--core/src/test/scala/spark/FailureSuite.scala2
-rw-r--r--core/src/test/scala/spark/FileServerSuite.scala110
-rw-r--r--core/src/test/scala/spark/FileSuite.scala18
-rw-r--r--core/src/test/scala/spark/JavaAPISuite.java6
-rw-r--r--core/src/test/scala/spark/MapOutputTrackerSuite.scala25
-rw-r--r--core/src/test/scala/spark/PartitioningSuite.scala2
-rw-r--r--core/src/test/scala/spark/PipedRDDSuite.scala3
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala48
-rw-r--r--core/src/test/scala/spark/ShuffleSuite.scala45
-rw-r--r--core/src/test/scala/spark/SizeEstimatorSuite.scala29
-rw-r--r--core/src/test/scala/spark/SortingSuite.scala69
-rw-r--r--core/src/test/scala/spark/ThreadingSuite.scala2
-rw-r--r--core/src/test/scala/spark/storage/BlockManagerSuite.scala254
20 files changed, 783 insertions, 103 deletions
diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties
index 02fe16866e..4c99e450bc 100644
--- a/core/src/test/resources/log4j.properties
+++ b/core/src/test/resources/log4j.properties
@@ -1,8 +1,10 @@
# Set everything to be logged to the console
-log4j.rootCategory=WARN, console
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=spark-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.eclipse.jetty=WARN
diff --git a/core/src/test/resources/uncommons-maths-1.2.2.jar b/core/src/test/resources/uncommons-maths-1.2.2.jar
new file mode 100644
index 0000000000..e126001c1c
--- /dev/null
+++ b/core/src/test/resources/uncommons-maths-1.2.2.jar
Binary files differ
diff --git a/core/src/test/scala/spark/AccumulatorSuite.scala b/core/src/test/scala/spark/AccumulatorSuite.scala
index d55969c261..d8be99dde7 100644
--- a/core/src/test/scala/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/spark/AccumulatorSuite.scala
@@ -18,6 +18,8 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
sc.stop()
sc = null
}
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
}
test ("basic accumulation"){
@@ -53,10 +55,11 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
}
sc.stop()
sc = null
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
}
}
-
implicit object SetAccum extends AccumulableParam[mutable.Set[Any], Any] {
def addInPlace(t1: mutable.Set[Any], t2: mutable.Set[Any]) : mutable.Set[Any] = {
t1 ++= t2
@@ -71,7 +74,6 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
}
}
-
test ("value not readable in tasks") {
import SetAccum._
val maxI = 1000
@@ -86,6 +88,54 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
} should produce [SparkException]
sc.stop()
sc = null
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
+ }
+ }
+
+ test ("collection accumulators") {
+ val maxI = 1000
+ for (nThreads <- List(1, 10)) {
+ // test single & multi-threaded
+ sc = new SparkContext("local[" + nThreads + "]", "test")
+ val setAcc = sc.accumulableCollection(mutable.HashSet[Int]())
+ val bufferAcc = sc.accumulableCollection(mutable.ArrayBuffer[Int]())
+ val mapAcc = sc.accumulableCollection(mutable.HashMap[Int,String]())
+ val d = sc.parallelize((1 to maxI) ++ (1 to maxI))
+ d.foreach {
+ x => {setAcc += x; bufferAcc += x; mapAcc += (x -> x.toString)}
+ }
+
+ // Note that this is typed correctly -- no casts necessary
+ setAcc.value.size should be (maxI)
+ bufferAcc.value.size should be (2 * maxI)
+ mapAcc.value.size should be (maxI)
+ for (i <- 1 to maxI) {
+ setAcc.value should contain(i)
+ bufferAcc.value should contain(i)
+ mapAcc.value should contain (i -> i.toString)
+ }
+ sc.stop()
+ sc = null
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
+ }
+ }
+
+ test ("localValue readable in tasks") {
+ import SetAccum._
+ val maxI = 1000
+ for (nThreads <- List(1, 10)) { //test single & multi-threaded
+ sc = new SparkContext("local[" + nThreads + "]", "test")
+ val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
+ val groupedInts = (1 to (maxI/20)).map {x => (20 * (x - 1) to 20 * x).toSet}
+ val d = sc.parallelize(groupedInts)
+ d.foreach {
+ x => acc.localValue ++= x
+ }
+ acc.value should be ( (0 to maxI).toSet)
+ sc.stop()
+ sc = null
}
}
diff --git a/core/src/test/scala/spark/BoundedMemoryCacheSuite.scala b/core/src/test/scala/spark/BoundedMemoryCacheSuite.scala
index dff2970566..37cafd1e8e 100644
--- a/core/src/test/scala/spark/BoundedMemoryCacheSuite.scala
+++ b/core/src/test/scala/spark/BoundedMemoryCacheSuite.scala
@@ -2,8 +2,10 @@ package spark
import org.scalatest.FunSuite
import org.scalatest.PrivateMethodTester
+import org.scalatest.matchers.ShouldMatchers
-class BoundedMemoryCacheSuite extends FunSuite with PrivateMethodTester {
+// TODO: Replace this with a test of MemoryStore
+class BoundedMemoryCacheSuite extends FunSuite with PrivateMethodTester with ShouldMatchers {
test("constructor test") {
val cache = new BoundedMemoryCache(60)
expect(60)(cache.getCapacity)
@@ -22,15 +24,21 @@ class BoundedMemoryCacheSuite extends FunSuite with PrivateMethodTester {
logInfo("Dropping key (%s, %d) of size %d to make space".format(datasetId, partition, entry.size))
}
}
+
+ // NOTE: The String class definition changed in JDK 7 to exclude the int fields count and length
+ // This means that the size of strings will be lesser by 8 bytes in JDK 7 compared to JDK 6.
+ // http://mail.openjdk.java.net/pipermail/core-libs-dev/2012-May/010257.html
+ // Work around to check for either.
+
//should be OK
- expect(CachePutSuccess(56))(cache.put("1", 0, "Meh"))
+ cache.put("1", 0, "Meh") should (equal (CachePutSuccess(56)) or equal (CachePutSuccess(48)))
//we cannot add this to cache (there is not enough space in cache) & we cannot evict the only value from
//cache because it's from the same dataset
expect(CachePutFailure())(cache.put("1", 1, "Meh"))
//should be OK, dataset '1' can be evicted from cache
- expect(CachePutSuccess(56))(cache.put("2", 0, "Meh"))
+ cache.put("2", 0, "Meh") should (equal (CachePutSuccess(56)) or equal (CachePutSuccess(48)))
//should fail, cache should obey it's capacity
expect(CachePutFailure())(cache.put("3", 0, "Very_long_and_useless_string"))
diff --git a/core/src/test/scala/spark/BroadcastSuite.scala b/core/src/test/scala/spark/BroadcastSuite.scala
index 0738a2725b..2d3302f0aa 100644
--- a/core/src/test/scala/spark/BroadcastSuite.scala
+++ b/core/src/test/scala/spark/BroadcastSuite.scala
@@ -12,6 +12,8 @@ class BroadcastSuite extends FunSuite with BeforeAndAfter {
sc.stop()
sc = null
}
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
}
test("basic broadcast") {
diff --git a/core/src/test/scala/spark/CacheTrackerSuite.scala b/core/src/test/scala/spark/CacheTrackerSuite.scala
index 426c0d26e9..467605981b 100644
--- a/core/src/test/scala/spark/CacheTrackerSuite.scala
+++ b/core/src/test/scala/spark/CacheTrackerSuite.scala
@@ -22,7 +22,7 @@ class CacheTrackerSuite extends FunSuite {
} catch {
case e: Exception =>
throw new SparkException("Error communicating with actor", e)
- }
+ }
}
test("CacheTrackerActor slave initialization & cache status") {
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
new file mode 100644
index 0000000000..cacc2796b6
--- /dev/null
+++ b/core/src/test/scala/spark/DistributedSuite.scala
@@ -0,0 +1,191 @@
+package spark
+
+import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
+import org.scalatest.matchers.ShouldMatchers
+import org.scalatest.prop.Checkers
+import org.scalacheck.Arbitrary._
+import org.scalacheck.Gen
+import org.scalacheck.Prop._
+
+import com.google.common.io.Files
+
+import scala.collection.mutable.ArrayBuffer
+
+import SparkContext._
+import storage.StorageLevel
+
+class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
+
+ val clusterUrl = "local-cluster[2,1,512]"
+
+ @transient var sc: SparkContext = _
+
+ after {
+ if (sc != null) {
+ sc.stop()
+ sc = null
+ }
+ System.clearProperty("spark.reducer.maxMbInFlight")
+ System.clearProperty("spark.storage.memoryFraction")
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
+ }
+
+ test("local-cluster format") {
+ sc = new SparkContext("local-cluster[2,1,512]", "test")
+ assert(sc.parallelize(1 to 2, 2).count() == 2)
+ sc.stop()
+ System.clearProperty("spark.master.port")
+ sc = new SparkContext("local-cluster[2 , 1 , 512]", "test")
+ assert(sc.parallelize(1 to 2, 2).count() == 2)
+ sc.stop()
+ System.clearProperty("spark.master.port")
+ sc = new SparkContext("local-cluster[2, 1, 512]", "test")
+ assert(sc.parallelize(1 to 2, 2).count() == 2)
+ sc.stop()
+ System.clearProperty("spark.master.port")
+ sc = new SparkContext("local-cluster[ 2, 1, 512 ]", "test")
+ assert(sc.parallelize(1 to 2, 2).count() == 2)
+ sc.stop()
+ System.clearProperty("spark.master.port")
+ sc = null
+ }
+
+ test("simple groupByKey") {
+ sc = new SparkContext(clusterUrl, "test")
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 5)
+ val groups = pairs.groupByKey(5).collect()
+ assert(groups.size === 2)
+ val valuesFor1 = groups.find(_._1 == 1).get._2
+ assert(valuesFor1.toList.sorted === List(1, 2, 3))
+ val valuesFor2 = groups.find(_._1 == 2).get._2
+ assert(valuesFor2.toList.sorted === List(1))
+ }
+
+ test("groupByKey where map output sizes exceed maxMbInFlight") {
+ System.setProperty("spark.reducer.maxMbInFlight", "1")
+ sc = new SparkContext(clusterUrl, "test")
+ // This data should be around 20 MB, so even with 4 mappers and 2 reducers, each map output
+ // file should be about 2.5 MB
+ val pairs = sc.parallelize(1 to 2000, 4).map(x => (x % 16, new Array[Byte](10000)))
+ val groups = pairs.groupByKey(2).map(x => (x._1, x._2.size)).collect()
+ assert(groups.length === 16)
+ assert(groups.map(_._2).sum === 2000)
+ // Note that spark.reducer.maxMbInFlight will be cleared in the test suite's after{} block
+ }
+
+ test("accumulators") {
+ sc = new SparkContext(clusterUrl, "test")
+ val accum = sc.accumulator(0)
+ sc.parallelize(1 to 10, 10).foreach(x => accum += x)
+ assert(accum.value === 55)
+ }
+
+ test("broadcast variables") {
+ sc = new SparkContext(clusterUrl, "test")
+ val array = new Array[Int](100)
+ val bv = sc.broadcast(array)
+ array(2) = 3 // Change the array -- this should not be seen on workers
+ val rdd = sc.parallelize(1 to 10, 10)
+ val sum = rdd.map(x => bv.value.sum).reduce(_ + _)
+ assert(sum === 0)
+ }
+
+ test("repeatedly failing task") {
+ sc = new SparkContext(clusterUrl, "test")
+ val accum = sc.accumulator(0)
+ val thrown = intercept[SparkException] {
+ sc.parallelize(1 to 10, 10).foreach(x => println(x / 0))
+ }
+ assert(thrown.getClass === classOf[SparkException])
+ assert(thrown.getMessage.contains("more than 4 times"))
+ }
+
+ test("caching") {
+ sc = new SparkContext(clusterUrl, "test")
+ val data = sc.parallelize(1 to 1000, 10).cache()
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ }
+
+ test("caching on disk") {
+ sc = new SparkContext(clusterUrl, "test")
+ val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.DISK_ONLY)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ }
+
+ test("caching in memory, replicated") {
+ sc = new SparkContext(clusterUrl, "test")
+ val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_2)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ }
+
+ test("caching in memory, serialized, replicated") {
+ sc = new SparkContext(clusterUrl, "test")
+ val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_SER_2)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ }
+
+ test("caching on disk, replicated") {
+ sc = new SparkContext(clusterUrl, "test")
+ val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.DISK_ONLY_2)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ }
+
+ test("caching in memory and disk, replicated") {
+ sc = new SparkContext(clusterUrl, "test")
+ val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_AND_DISK_2)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ }
+
+ test("caching in memory and disk, serialized, replicated") {
+ sc = new SparkContext(clusterUrl, "test")
+ val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_AND_DISK_SER_2)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ }
+
+ test("compute without caching when no partitions fit in memory") {
+ System.setProperty("spark.storage.memoryFraction", "0.0001")
+ sc = new SparkContext(clusterUrl, "test")
+ // data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache
+ // to only 50 KB (0.0001 of 512 MB), so no partitions should fit in memory
+ val data = sc.parallelize(1 to 4000000, 2).persist(StorageLevel.MEMORY_ONLY_SER)
+ assert(data.count() === 4000000)
+ assert(data.count() === 4000000)
+ assert(data.count() === 4000000)
+ System.clearProperty("spark.storage.memoryFraction")
+ }
+
+ test("compute when only some partitions fit in memory") {
+ System.setProperty("spark.storage.memoryFraction", "0.01")
+ sc = new SparkContext(clusterUrl, "test")
+ // data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache
+ // to only 5 MB (0.01 of 512 MB), so not all of it will fit in memory; we use 20 partitions
+ // to make sure that *some* of them do fit though
+ val data = sc.parallelize(1 to 4000000, 20).persist(StorageLevel.MEMORY_ONLY_SER)
+ assert(data.count() === 4000000)
+ assert(data.count() === 4000000)
+ assert(data.count() === 4000000)
+ System.clearProperty("spark.storage.memoryFraction")
+ }
+
+ test("passing environment variables to cluster") {
+ sc = new SparkContext(clusterUrl, "test", null, Nil, Map("TEST_VAR" -> "TEST_VALUE"))
+ val values = sc.parallelize(1 to 2, 2).map(x => System.getenv("TEST_VAR")).collect()
+ assert(values.toSeq === Seq("TEST_VALUE", "TEST_VALUE"))
+ }
+}
diff --git a/core/src/test/scala/spark/FailureSuite.scala b/core/src/test/scala/spark/FailureSuite.scala
index 4405829161..a3454f25f6 100644
--- a/core/src/test/scala/spark/FailureSuite.scala
+++ b/core/src/test/scala/spark/FailureSuite.scala
@@ -32,6 +32,8 @@ class FailureSuite extends FunSuite with BeforeAndAfter {
sc.stop()
sc = null
}
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
}
// Run a 3-task map job in which task 1 deterministically fails once, and check
diff --git a/core/src/test/scala/spark/FileServerSuite.scala b/core/src/test/scala/spark/FileServerSuite.scala
new file mode 100644
index 0000000000..b4283d9604
--- /dev/null
+++ b/core/src/test/scala/spark/FileServerSuite.scala
@@ -0,0 +1,110 @@
+package spark
+
+import com.google.common.io.Files
+import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
+import java.io.{File, PrintWriter, FileReader, BufferedReader}
+import SparkContext._
+
+class FileServerSuite extends FunSuite with BeforeAndAfter {
+
+ @transient var sc: SparkContext = _
+ @transient var tmpFile : File = _
+ @transient var testJarFile : File = _
+
+ before {
+ // Create a sample text file
+ val tmpdir = new File(Files.createTempDir(), "test")
+ tmpdir.mkdir()
+ tmpFile = new File(tmpdir, "FileServerSuite.txt")
+ val pw = new PrintWriter(tmpFile)
+ pw.println("100")
+ pw.close()
+ }
+
+ after {
+ if (sc != null) {
+ sc.stop()
+ sc = null
+ }
+ // Clean up downloaded file
+ if (tmpFile.exists) {
+ tmpFile.delete()
+ }
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
+ }
+
+ test("Distributing files locally") {
+ sc = new SparkContext("local[4]", "test")
+ sc.addFile(tmpFile.toString)
+ val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
+ val result = sc.parallelize(testData).reduceByKey {
+ val in = new BufferedReader(new FileReader("FileServerSuite.txt"))
+ val fileVal = in.readLine().toInt
+ in.close()
+ _ * fileVal + _ * fileVal
+ }.collect
+ assert(result.toSet === Set((1,200), (2,300), (3,500)))
+ }
+
+ test("Distributing files locally using URL as input") {
+ // addFile("file:///....")
+ sc = new SparkContext("local[4]", "test")
+ sc.addFile((new File(tmpFile.toString)).toURL.toString)
+ val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
+ val result = sc.parallelize(testData).reduceByKey {
+ val in = new BufferedReader(new FileReader("FileServerSuite.txt"))
+ val fileVal = in.readLine().toInt
+ in.close()
+ _ * fileVal + _ * fileVal
+ }.collect
+ assert(result.toSet === Set((1,200), (2,300), (3,500)))
+ }
+
+ test ("Dynamically adding JARS locally") {
+ sc = new SparkContext("local[4]", "test")
+ val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile()
+ sc.addJar(sampleJarFile)
+ val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
+ val result = sc.parallelize(testData).reduceByKey { (x,y) =>
+ val fac = Thread.currentThread.getContextClassLoader()
+ .loadClass("org.uncommons.maths.Maths")
+ .getDeclaredMethod("factorial", classOf[Int])
+ val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
+ val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
+ a + b
+ }.collect()
+ assert(result.toSet === Set((1,2), (2,7), (3,121)))
+ }
+
+ test("Distributing files on a standalone cluster") {
+ sc = new SparkContext("local-cluster[1,1,512]", "test")
+ sc.addFile(tmpFile.toString)
+ val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
+ val result = sc.parallelize(testData).reduceByKey {
+ val in = new BufferedReader(new FileReader("FileServerSuite.txt"))
+ val fileVal = in.readLine().toInt
+ in.close()
+ _ * fileVal + _ * fileVal
+ }.collect
+ println(result)
+ assert(result.toSet === Set((1,200), (2,300), (3,500)))
+ }
+
+ test ("Dynamically adding JARS on a standalone cluster") {
+ sc = new SparkContext("local-cluster[1,1,512]", "test")
+ val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile()
+ sc.addJar(sampleJarFile)
+ val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
+ val result = sc.parallelize(testData).reduceByKey { (x,y) =>
+ val fac = Thread.currentThread.getContextClassLoader()
+ .loadClass("org.uncommons.maths.Maths")
+ .getDeclaredMethod("factorial", classOf[Int])
+ val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
+ val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
+ a + b
+ }.collect()
+ assert(result.toSet === Set((1,2), (2,7), (3,121)))
+ }
+}
diff --git a/core/src/test/scala/spark/FileSuite.scala b/core/src/test/scala/spark/FileSuite.scala
index 17c7a8de43..554bea53a9 100644
--- a/core/src/test/scala/spark/FileSuite.scala
+++ b/core/src/test/scala/spark/FileSuite.scala
@@ -1,6 +1,6 @@
package spark
-import java.io.File
+import java.io.{FileWriter, PrintWriter, File}
import scala.io.Source
@@ -20,6 +20,8 @@ class FileSuite extends FunSuite with BeforeAndAfter {
sc.stop()
sc = null
}
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
}
test("text files") {
@@ -142,4 +144,18 @@ class FileSuite extends FunSuite with BeforeAndAfter {
sc.newAPIHadoopFile[IntWritable, Text, SequenceFileInputFormat[IntWritable, Text]](outputDir)
assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
}
+
+ test("file caching") {
+ sc = new SparkContext("local", "test")
+ val tempDir = Files.createTempDir()
+ val out = new FileWriter(tempDir + "/input")
+ out.write("Hello world!\n")
+ out.write("What's up?\n")
+ out.write("Goodbye\n")
+ out.close()
+ val rdd = sc.textFile(tempDir + "/input").cache()
+ assert(rdd.count() === 3)
+ assert(rdd.count() === 3)
+ assert(rdd.count() === 3)
+ }
}
diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java
index 24bf021710..5875506179 100644
--- a/core/src/test/scala/spark/JavaAPISuite.java
+++ b/core/src/test/scala/spark/JavaAPISuite.java
@@ -112,15 +112,19 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(new Tuple2<Integer, Integer>(3, 2), sortedPairs.get(2));
}
+ static int foreachCalls = 0;
+
@Test
public void foreach() {
+ foreachCalls = 0;
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
rdd.foreach(new VoidFunction<String>() {
@Override
public void call(String s) {
- System.out.println(s);
+ foreachCalls++;
}
});
+ Assert.assertEquals(2, foreachCalls);
}
@Test
diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
new file mode 100644
index 0000000000..4e9717d871
--- /dev/null
+++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
@@ -0,0 +1,25 @@
+package spark
+
+import org.scalatest.FunSuite
+
+class MapOutputTrackerSuite extends FunSuite {
+ test("compressSize") {
+ assert(MapOutputTracker.compressSize(0L) === 0)
+ assert(MapOutputTracker.compressSize(1L) === 0)
+ assert(MapOutputTracker.compressSize(2L) === 8)
+ assert(MapOutputTracker.compressSize(10L) === 25)
+ assert((MapOutputTracker.compressSize(1000000L) & 0xFF) === 145)
+ assert((MapOutputTracker.compressSize(1000000000L) & 0xFF) === 218)
+ // This last size is bigger than we can encode in a byte, so check that we just return 255
+ assert((MapOutputTracker.compressSize(1000000000000000000L) & 0xFF) === 255)
+ }
+
+ test("decompressSize") {
+ assert(MapOutputTracker.decompressSize(0) === 1)
+ for (size <- Seq(2L, 10L, 100L, 50000L, 1000000L, 1000000000L)) {
+ val size2 = MapOutputTracker.decompressSize(MapOutputTracker.compressSize(size))
+ assert(size2 >= 0.99 * size && size2 <= 1.11 * size,
+ "size " + size + " decompressed to " + size2 + ", which is out of range")
+ }
+ }
+}
diff --git a/core/src/test/scala/spark/PartitioningSuite.scala b/core/src/test/scala/spark/PartitioningSuite.scala
index 5000fa7307..3dadc7acec 100644
--- a/core/src/test/scala/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/spark/PartitioningSuite.scala
@@ -16,6 +16,8 @@ class PartitioningSuite extends FunSuite with BeforeAndAfter {
sc.stop()
sc = null
}
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
}
diff --git a/core/src/test/scala/spark/PipedRDDSuite.scala b/core/src/test/scala/spark/PipedRDDSuite.scala
index 426652dc15..9b84b29227 100644
--- a/core/src/test/scala/spark/PipedRDDSuite.scala
+++ b/core/src/test/scala/spark/PipedRDDSuite.scala
@@ -13,6 +13,8 @@ class PipedRDDSuite extends FunSuite with BeforeAndAfter {
sc.stop()
sc = null
}
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
}
test("basic pipe") {
@@ -22,7 +24,6 @@ class PipedRDDSuite extends FunSuite with BeforeAndAfter {
val piped = nums.pipe(Seq("cat"))
val c = piped.collect()
- println(c.toSeq)
assert(c.size === 4)
assert(c(0) === "1")
assert(c(1) === "2")
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index ba9b36adb7..37a0ff0947 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -3,6 +3,8 @@ package spark
import scala.collection.mutable.HashMap
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
+
+import spark.rdd.CoalescedRDD
import SparkContext._
class RDDSuite extends FunSuite with BeforeAndAfter {
@@ -14,6 +16,8 @@ class RDDSuite extends FunSuite with BeforeAndAfter {
sc.stop()
sc = null
}
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
}
test("basic operations") {
@@ -29,6 +33,11 @@ class RDDSuite extends FunSuite with BeforeAndAfter {
assert(nums.glom().map(_.toList).collect().toList === List(List(1, 2), List(3, 4)))
val partitionSums = nums.mapPartitions(iter => Iterator(iter.reduceLeft(_ + _)))
assert(partitionSums.collect().toList === List(3, 7))
+
+ val partitionSumsWithSplit = nums.mapPartitionsWithSplit {
+ case(split, iter) => Iterator((split, iter.reduceLeft(_ + _)))
+ }
+ assert(partitionSumsWithSplit.collect().toList === List((0, 3), (1, 7)))
}
test("SparkContext.union") {
@@ -66,4 +75,43 @@ class RDDSuite extends FunSuite with BeforeAndAfter {
val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).flatMap(x => 1 to x).checkpoint()
assert(rdd.collect().toList === List(1, 1, 2, 1, 2, 3, 1, 2, 3, 4))
}
+
+ test("basic caching") {
+ sc = new SparkContext("local", "test")
+ val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
+ assert(rdd.collect().toList === List(1, 2, 3, 4))
+ assert(rdd.collect().toList === List(1, 2, 3, 4))
+ assert(rdd.collect().toList === List(1, 2, 3, 4))
+ }
+
+ test("coalesced RDDs") {
+ sc = new SparkContext("local", "test")
+ val data = sc.parallelize(1 to 10, 10)
+
+ val coalesced1 = new CoalescedRDD(data, 2)
+ assert(coalesced1.collect().toList === (1 to 10).toList)
+ assert(coalesced1.glom().collect().map(_.toList).toList ===
+ List(List(1, 2, 3, 4, 5), List(6, 7, 8, 9, 10)))
+
+ // Check that the narrow dependency is also specified correctly
+ assert(coalesced1.dependencies.head.getParents(0).toList === List(0, 1, 2, 3, 4))
+ assert(coalesced1.dependencies.head.getParents(1).toList === List(5, 6, 7, 8, 9))
+
+ val coalesced2 = new CoalescedRDD(data, 3)
+ assert(coalesced2.collect().toList === (1 to 10).toList)
+ assert(coalesced2.glom().collect().map(_.toList).toList ===
+ List(List(1, 2, 3), List(4, 5, 6), List(7, 8, 9, 10)))
+
+ val coalesced3 = new CoalescedRDD(data, 10)
+ assert(coalesced3.collect().toList === (1 to 10).toList)
+ assert(coalesced3.glom().collect().map(_.toList).toList ===
+ (1 to 10).map(x => List(x)).toList)
+
+ // If we try to coalesce into more partitions than the original RDD, it should just
+ // keep the original number of partitions.
+ val coalesced4 = new CoalescedRDD(data, 20)
+ assert(coalesced4.collect().toList === (1 to 10).toList)
+ assert(coalesced4.glom().collect().map(_.toList).toList ===
+ (1 to 10).map(x => List(x)).toList)
+ }
}
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index 99d13b31ef..8170100f1d 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -1,7 +1,10 @@
package spark
+import scala.collection.mutable.ArrayBuffer
+
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
+import org.scalatest.matchers.ShouldMatchers
import org.scalatest.prop.Checkers
import org.scalacheck.Arbitrary._
import org.scalacheck.Gen
@@ -9,21 +12,22 @@ import org.scalacheck.Prop._
import com.google.common.io.Files
-import scala.collection.mutable.ArrayBuffer
+import spark.rdd.ShuffledRDD
+import spark.SparkContext._
-import SparkContext._
+class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
-class ShuffleSuite extends FunSuite with BeforeAndAfter {
-
var sc: SparkContext = _
-
+
after {
if (sc != null) {
sc.stop()
sc = null
}
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
}
-
+
test("groupByKey") {
sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
@@ -56,7 +60,7 @@ class ShuffleSuite extends FunSuite with BeforeAndAfter {
val valuesFor2 = groups.find(_._1 == 2).get._2
assert(valuesFor2.toList.sorted === List(1))
}
-
+
test("groupByKey with many output partitions") {
sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
@@ -68,6 +72,22 @@ class ShuffleSuite extends FunSuite with BeforeAndAfter {
assert(valuesFor2.toList.sorted === List(1))
}
+ test("groupByKey with compression") {
+ try {
+ System.setProperty("spark.blockManager.compress", "true")
+ sc = new SparkContext("local", "test")
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4)
+ val groups = pairs.groupByKey(4).collect()
+ assert(groups.size === 2)
+ val valuesFor1 = groups.find(_._1 == 1).get._2
+ assert(valuesFor1.toList.sorted === List(1, 2, 3))
+ val valuesFor2 = groups.find(_._1 == 2).get._2
+ assert(valuesFor2.toList.sorted === List(1))
+ } finally {
+ System.setProperty("spark.blockManager.compress", "false")
+ }
+ }
+
test("reduceByKey") {
sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
@@ -186,7 +206,7 @@ class ShuffleSuite extends FunSuite with BeforeAndAfter {
(4, (ArrayBuffer(), ArrayBuffer('w')))
))
}
-
+
test("zero-partition RDD") {
sc = new SparkContext("local", "test")
val emptyDir = Files.createTempDir()
@@ -194,6 +214,13 @@ class ShuffleSuite extends FunSuite with BeforeAndAfter {
assert(file.splits.size == 0)
assert(file.collect().toList === Nil)
// Test that a shuffle on the file works, because this used to be a bug
- assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
+ assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
+ }
+}
+
+object ShuffleSuite {
+ def mergeCombineException(x: Int, y: Int): Int = {
+ throw new SparkException("Exception for map-side combine.")
+ x + y
}
}
diff --git a/core/src/test/scala/spark/SizeEstimatorSuite.scala b/core/src/test/scala/spark/SizeEstimatorSuite.scala
index a2015644ee..17f366212b 100644
--- a/core/src/test/scala/spark/SizeEstimatorSuite.scala
+++ b/core/src/test/scala/spark/SizeEstimatorSuite.scala
@@ -3,6 +3,7 @@ package spark
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfterAll
import org.scalatest.PrivateMethodTester
+import org.scalatest.matchers.ShouldMatchers
class DummyClass1 {}
@@ -19,7 +20,9 @@ class DummyClass4(val d: DummyClass3) {
val x: Int = 0
}
-class SizeEstimatorSuite extends FunSuite with BeforeAndAfterAll with PrivateMethodTester {
+class SizeEstimatorSuite
+ extends FunSuite with BeforeAndAfterAll with PrivateMethodTester with ShouldMatchers {
+
var oldArch: String = _
var oldOops: String = _
@@ -42,11 +45,15 @@ class SizeEstimatorSuite extends FunSuite with BeforeAndAfterAll with PrivateMet
expect(48)(SizeEstimator.estimate(new DummyClass4(new DummyClass3)))
}
+ // NOTE: The String class definition changed in JDK 7 to exclude the int fields count and length.
+ // This means that the size of strings will be lesser by 8 bytes in JDK 7 compared to JDK 6.
+ // http://mail.openjdk.java.net/pipermail/core-libs-dev/2012-May/010257.html
+ // Work around to check for either.
test("strings") {
- expect(48)(SizeEstimator.estimate(""))
- expect(56)(SizeEstimator.estimate("a"))
- expect(56)(SizeEstimator.estimate("ab"))
- expect(64)(SizeEstimator.estimate("abcdefgh"))
+ SizeEstimator.estimate("") should (equal (48) or equal (40))
+ SizeEstimator.estimate("a") should (equal (56) or equal (48))
+ SizeEstimator.estimate("ab") should (equal (56) or equal (48))
+ SizeEstimator.estimate("abcdefgh") should (equal(64) or equal(56))
}
test("primitive arrays") {
@@ -106,6 +113,10 @@ class SizeEstimatorSuite extends FunSuite with BeforeAndAfterAll with PrivateMet
resetOrClear("os.arch", arch)
}
+ // NOTE: The String class definition changed in JDK 7 to exclude the int fields count and length.
+ // This means that the size of strings will be lesser by 8 bytes in JDK 7 compared to JDK 6.
+ // http://mail.openjdk.java.net/pipermail/core-libs-dev/2012-May/010257.html
+ // Work around to check for either.
test("64-bit arch with no compressed oops") {
val arch = System.setProperty("os.arch", "amd64")
val oops = System.setProperty("spark.test.useCompressedOops", "false")
@@ -113,10 +124,10 @@ class SizeEstimatorSuite extends FunSuite with BeforeAndAfterAll with PrivateMet
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
- expect(64)(SizeEstimator.estimate(""))
- expect(72)(SizeEstimator.estimate("a"))
- expect(72)(SizeEstimator.estimate("ab"))
- expect(80)(SizeEstimator.estimate("abcdefgh"))
+ SizeEstimator.estimate("") should (equal (64) or equal (56))
+ SizeEstimator.estimate("a") should (equal (72) or equal (64))
+ SizeEstimator.estimate("ab") should (equal (72) or equal (64))
+ SizeEstimator.estimate("abcdefgh") should (equal (80) or equal (72))
resetOrClear("os.arch", arch)
resetOrClear("spark.test.useCompressedOops", oops)
diff --git a/core/src/test/scala/spark/SortingSuite.scala b/core/src/test/scala/spark/SortingSuite.scala
index 8fa1442a4d..1ad11ff4c3 100644
--- a/core/src/test/scala/spark/SortingSuite.scala
+++ b/core/src/test/scala/spark/SortingSuite.scala
@@ -12,12 +12,15 @@ class SortingSuite extends FunSuite with BeforeAndAfter with ShouldMatchers with
after {
if (sc != null) {
sc.stop()
+ sc = null
}
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
}
test("sortByKey") {
sc = new SparkContext("local", "test")
- val pairs = sc.parallelize(Array((1, 0), (2, 0), (0, 0), (3, 0)))
+ val pairs = sc.parallelize(Array((1, 0), (2, 0), (0, 0), (3, 0)), 2)
assert(pairs.sortByKey().collect() === Array((0,0), (1,0), (2,0), (3,0)))
}
@@ -25,18 +28,56 @@ class SortingSuite extends FunSuite with BeforeAndAfter with ShouldMatchers with
sc = new SparkContext("local", "test")
val rand = new scala.util.Random()
val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
- val pairs = sc.parallelize(pairArr)
- assert(pairs.sortByKey().collect() === pairArr.sortBy(_._1))
+ val pairs = sc.parallelize(pairArr, 2)
+ val sorted = pairs.sortByKey()
+ assert(sorted.splits.size === 2)
+ assert(sorted.collect() === pairArr.sortBy(_._1))
}
+ test("large array with one split") {
+ sc = new SparkContext("local", "test")
+ val rand = new scala.util.Random()
+ val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
+ val pairs = sc.parallelize(pairArr, 2)
+ val sorted = pairs.sortByKey(true, 1)
+ assert(sorted.splits.size === 1)
+ assert(sorted.collect() === pairArr.sortBy(_._1))
+ }
+
+ test("large array with many splits") {
+ sc = new SparkContext("local", "test")
+ val rand = new scala.util.Random()
+ val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
+ val pairs = sc.parallelize(pairArr, 2)
+ val sorted = pairs.sortByKey(true, 20)
+ assert(sorted.splits.size === 20)
+ assert(sorted.collect() === pairArr.sortBy(_._1))
+ }
+
test("sort descending") {
sc = new SparkContext("local", "test")
val rand = new scala.util.Random()
val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
- val pairs = sc.parallelize(pairArr)
+ val pairs = sc.parallelize(pairArr, 2)
assert(pairs.sortByKey(false).collect() === pairArr.sortWith((x, y) => x._1 > y._1))
}
+ test("sort descending with one split") {
+ sc = new SparkContext("local", "test")
+ val rand = new scala.util.Random()
+ val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
+ val pairs = sc.parallelize(pairArr, 1)
+ assert(pairs.sortByKey(false, 1).collect() === pairArr.sortWith((x, y) => x._1 > y._1))
+ }
+
+ test("sort descending with many splits") {
+ sc = new SparkContext("local", "test")
+ val rand = new scala.util.Random()
+ val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
+ val pairs = sc.parallelize(pairArr, 2)
+ assert(pairs.sortByKey(false, 20).collect() === pairArr.sortWith((x, y) => x._1 > y._1))
+ }
+
test("more partitions than elements") {
sc = new SparkContext("local", "test")
val rand = new scala.util.Random()
@@ -48,7 +89,7 @@ class SortingSuite extends FunSuite with BeforeAndAfter with ShouldMatchers with
test("empty RDD") {
sc = new SparkContext("local", "test")
val pairArr = new Array[(Int, Int)](0)
- val pairs = sc.parallelize(pairArr)
+ val pairs = sc.parallelize(pairArr, 2)
assert(pairs.sortByKey().collect() === pairArr.sortBy(_._1))
}
@@ -58,11 +99,11 @@ class SortingSuite extends FunSuite with BeforeAndAfter with ShouldMatchers with
val sorted = sc.parallelize(pairArr, 4).sortByKey()
assert(sorted.collect() === pairArr.sortBy(_._1))
val partitions = sorted.collectPartitions()
- logInfo("partition lengths: " + partitions.map(_.length).mkString(", "))
- partitions(0).length should be > 200
- partitions(1).length should be > 200
- partitions(2).length should be > 200
- partitions(3).length should be > 200
+ logInfo("Partition lengths: " + partitions.map(_.length).mkString(", "))
+ partitions(0).length should be > 180
+ partitions(1).length should be > 180
+ partitions(2).length should be > 180
+ partitions(3).length should be > 180
partitions(0).last should be < partitions(1).head
partitions(1).last should be < partitions(2).head
partitions(2).last should be < partitions(3).head
@@ -75,10 +116,10 @@ class SortingSuite extends FunSuite with BeforeAndAfter with ShouldMatchers with
assert(sorted.collect() === pairArr.sortBy(_._1).reverse)
val partitions = sorted.collectPartitions()
logInfo("partition lengths: " + partitions.map(_.length).mkString(", "))
- partitions(0).length should be > 200
- partitions(1).length should be > 200
- partitions(2).length should be > 200
- partitions(3).length should be > 200
+ partitions(0).length should be > 180
+ partitions(1).length should be > 180
+ partitions(2).length should be > 180
+ partitions(3).length should be > 180
partitions(0).last should be > partitions(1).head
partitions(1).last should be > partitions(2).head
partitions(2).last should be > partitions(3).head
diff --git a/core/src/test/scala/spark/ThreadingSuite.scala b/core/src/test/scala/spark/ThreadingSuite.scala
index 302f731187..e9b1837d89 100644
--- a/core/src/test/scala/spark/ThreadingSuite.scala
+++ b/core/src/test/scala/spark/ThreadingSuite.scala
@@ -31,6 +31,8 @@ class ThreadingSuite extends FunSuite with BeforeAndAfter {
sc.stop()
sc = null
}
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
}
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index f3f891e471..b9c19e61cd 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -13,10 +13,14 @@ import spark.SizeEstimator
import spark.util.ByteBufferInputStream
class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester {
+ var store: BlockManager = null
var actorSystem: ActorSystem = null
var master: BlockManagerMaster = null
- var oldArch: String = _
- var oldOops: String = _
+ var oldArch: String = null
+ var oldOops: String = null
+
+ // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
+ val serializer = new KryoSerializer
before {
actorSystem = ActorSystem("test")
@@ -30,6 +34,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
after {
+ if (store != null) {
+ store.stop()
+ }
actorSystem.shutdown()
actorSystem.awaitTermination()
actorSystem = null
@@ -49,15 +56,15 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("manager-master interaction") {
- val store = new BlockManager(master, new KryoSerializer, 2000)
+ store = new BlockManager(master, serializer, 2000)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
// Putting a1, a2 and a3 in memory and telling master only about a1 and a2
- store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_DESER)
- store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_DESER)
- store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY_DESER, false)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, false)
// Checking whether blocks are in memory
assert(store.getSingle("a1") != None, "a1 was not in store")
@@ -69,9 +76,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(master.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2")
assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3")
- // Setting storage level of a1 and a2 to invalid; they should be removed from store and master
- store.setLevel("a1", new StorageLevel(false, false, false, 1))
- store.setLevel("a2", new StorageLevel(true, false, false, 0))
+ // Drop a1 and a2 from memory; this should be reported back to the master
+ store.dropFromMemory("a1", null)
+ store.dropFromMemory("a2", null)
assert(store.getSingle("a1") === None, "a1 not removed from store")
assert(store.getSingle("a2") === None, "a2 not removed from store")
assert(master.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1")
@@ -79,97 +86,167 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU storage") {
- val store = new BlockManager(master, new KryoSerializer, 1200)
+ store = new BlockManager(master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
- store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_DESER)
- store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_DESER)
- store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY_DESER)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY)
assert(store.getSingle("a2") != None, "a2 was not in store")
assert(store.getSingle("a3") != None, "a3 was not in store")
- Thread.sleep(100)
assert(store.getSingle("a1") === None, "a1 was in store")
assert(store.getSingle("a2") != None, "a2 was not in store")
// At this point a2 was gotten last, so LRU will getSingle rid of a3
- store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_DESER)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
assert(store.getSingle("a1") != None, "a1 was not in store")
assert(store.getSingle("a2") != None, "a2 was not in store")
- Thread.sleep(100)
assert(store.getSingle("a3") === None, "a3 was in store")
}
test("in-memory LRU storage with serialization") {
- val store = new BlockManager(master, new KryoSerializer, 1200)
+ store = new BlockManager(master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
- store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
- store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
- store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY)
- Thread.sleep(100)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
+ store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
+ store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY_SER)
assert(store.getSingle("a2") != None, "a2 was not in store")
assert(store.getSingle("a3") != None, "a3 was not in store")
assert(store.getSingle("a1") === None, "a1 was in store")
assert(store.getSingle("a2") != None, "a2 was not in store")
// At this point a2 was gotten last, so LRU will getSingle rid of a3
- store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_DESER)
- Thread.sleep(100)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
assert(store.getSingle("a1") != None, "a1 was not in store")
assert(store.getSingle("a2") != None, "a2 was not in store")
- assert(store.getSingle("a3") === None, "a1 was in store")
+ assert(store.getSingle("a3") === None, "a3 was in store")
}
-
+
+ test("in-memory LRU for partitions of same RDD") {
+ store = new BlockManager(master, serializer, 1200)
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+ val a3 = new Array[Byte](400)
+ store.putSingle("rdd_0_1", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("rdd_0_2", a2, StorageLevel.MEMORY_ONLY)
+ store.putSingle("rdd_0_3", a3, StorageLevel.MEMORY_ONLY)
+ // Even though we accessed rdd_0_3 last, it should not have replaced partitiosn 1 and 2
+ // from the same RDD
+ assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store")
+ assert(store.getSingle("rdd_0_2") != None, "rdd_0_2 was not in store")
+ assert(store.getSingle("rdd_0_1") != None, "rdd_0_1 was not in store")
+ // Check that rdd_0_3 doesn't replace them even after further accesses
+ assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store")
+ assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store")
+ assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store")
+ }
+
+ test("in-memory LRU for partitions of multiple RDDs") {
+ store = new BlockManager(master, serializer, 1200)
+ store.putSingle("rdd_0_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+ store.putSingle("rdd_0_2", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+ store.putSingle("rdd_1_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+ // At this point rdd_1_1 should've replaced rdd_0_1
+ assert(store.memoryStore.contains("rdd_1_1"), "rdd_1_1 was not in store")
+ assert(!store.memoryStore.contains("rdd_0_1"), "rdd_0_1 was in store")
+ assert(store.memoryStore.contains("rdd_0_2"), "rdd_0_2 was not in store")
+ // Do a get() on rdd_0_2 so that it is the most recently used item
+ assert(store.getSingle("rdd_0_2") != None, "rdd_0_2 was not in store")
+ // Put in more partitions from RDD 0; they should replace rdd_1_1
+ store.putSingle("rdd_0_3", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+ store.putSingle("rdd_0_4", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+ // Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped
+ // when we try to add rdd_0_4.
+ assert(!store.memoryStore.contains("rdd_1_1"), "rdd_1_1 was in store")
+ assert(!store.memoryStore.contains("rdd_0_1"), "rdd_0_1 was in store")
+ assert(!store.memoryStore.contains("rdd_0_4"), "rdd_0_4 was in store")
+ assert(store.memoryStore.contains("rdd_0_2"), "rdd_0_2 was not in store")
+ assert(store.memoryStore.contains("rdd_0_3"), "rdd_0_3 was not in store")
+ }
+
test("on-disk storage") {
- val store = new BlockManager(master, new KryoSerializer, 1200)
+ store = new BlockManager(master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
store.putSingle("a1", a1, StorageLevel.DISK_ONLY)
store.putSingle("a2", a2, StorageLevel.DISK_ONLY)
store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
- assert(store.getSingle("a2") != None, "a2 was not in store")
- assert(store.getSingle("a3") != None, "a3 was not in store")
- assert(store.getSingle("a1") != None, "a1 was not in store")
+ assert(store.getSingle("a2") != None, "a2 was in store")
+ assert(store.getSingle("a3") != None, "a3 was in store")
+ assert(store.getSingle("a1") != None, "a1 was in store")
}
test("disk and memory storage") {
- val store = new BlockManager(master, new KryoSerializer, 1200)
+ store = new BlockManager(master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
- store.putSingle("a1", a1, StorageLevel.DISK_AND_MEMORY_DESER)
- store.putSingle("a2", a2, StorageLevel.DISK_AND_MEMORY_DESER)
- store.putSingle("a3", a3, StorageLevel.DISK_AND_MEMORY_DESER)
- Thread.sleep(100)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK)
+ store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK)
+ store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK)
assert(store.getSingle("a2") != None, "a2 was not in store")
assert(store.getSingle("a3") != None, "a3 was not in store")
+ assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
assert(store.getSingle("a1") != None, "a1 was not in store")
+ assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
+ }
+
+ test("disk and memory storage with getLocalBytes") {
+ store = new BlockManager(master, serializer, 1200)
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+ val a3 = new Array[Byte](400)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK)
+ store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK)
+ store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK)
+ assert(store.getLocalBytes("a2") != None, "a2 was not in store")
+ assert(store.getLocalBytes("a3") != None, "a3 was not in store")
+ assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
+ assert(store.getLocalBytes("a1") != None, "a1 was not in store")
+ assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
}
test("disk and memory storage with serialization") {
- val store = new BlockManager(master, new KryoSerializer, 1200)
+ store = new BlockManager(master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
- store.putSingle("a1", a1, StorageLevel.DISK_AND_MEMORY)
- store.putSingle("a2", a2, StorageLevel.DISK_AND_MEMORY)
- store.putSingle("a3", a3, StorageLevel.DISK_AND_MEMORY)
- Thread.sleep(100)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER)
+ store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER)
+ store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER)
assert(store.getSingle("a2") != None, "a2 was not in store")
assert(store.getSingle("a3") != None, "a3 was not in store")
+ assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
assert(store.getSingle("a1") != None, "a1 was not in store")
+ assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
+ }
+
+ test("disk and memory storage with serialization and getLocalBytes") {
+ store = new BlockManager(master, serializer, 1200)
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+ val a3 = new Array[Byte](400)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER)
+ store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER)
+ store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER)
+ assert(store.getLocalBytes("a2") != None, "a2 was not in store")
+ assert(store.getLocalBytes("a3") != None, "a3 was not in store")
+ assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
+ assert(store.getLocalBytes("a1") != None, "a1 was not in store")
+ assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
}
test("LRU with mixed storage levels") {
- val store = new BlockManager(master, new KryoSerializer, 1200)
+ store = new BlockManager(master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
val a4 = new Array[Byte](400)
// First store a1 and a2, both in memory, and a3, on disk only
- store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
- store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
+ store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
// At this point LRU should not kick in because a3 is only on disk
assert(store.getSingle("a1") != None, "a2 was not in store")
@@ -179,8 +256,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.getSingle("a2") != None, "a3 was not in store")
assert(store.getSingle("a3") != None, "a1 was not in store")
// Now let's add in a4, which uses both disk and memory; a1 should drop out
- store.putSingle("a4", a4, StorageLevel.DISK_AND_MEMORY)
- Thread.sleep(100)
+ store.putSingle("a4", a4, StorageLevel.MEMORY_AND_DISK_SER)
assert(store.getSingle("a1") == None, "a1 was in store")
assert(store.getSingle("a2") != None, "a2 was not in store")
assert(store.getSingle("a3") != None, "a3 was not in store")
@@ -188,14 +264,13 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU with streams") {
- val store = new BlockManager(master, new KryoSerializer, 1200)
+ store = new BlockManager(master, serializer, 1200)
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
- store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_DESER)
- store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_DESER)
- store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY_DESER)
- Thread.sleep(100)
+ store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, true)
+ store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, true)
+ store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, true)
assert(store.get("list2") != None, "list2 was not in store")
assert(store.get("list2").get.size == 2)
assert(store.get("list3") != None, "list3 was not in store")
@@ -204,8 +279,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.get("list2") != None, "list2 was not in store")
assert(store.get("list2").get.size == 2)
// At this point list2 was gotten last, so LRU will getSingle rid of list3
- store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_DESER)
- Thread.sleep(100)
+ store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, true)
assert(store.get("list1") != None, "list1 was not in store")
assert(store.get("list1").get.size == 2)
assert(store.get("list2") != None, "list2 was not in store")
@@ -214,16 +288,15 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("LRU with mixed storage levels and streams") {
- val store = new BlockManager(master, new KryoSerializer, 1200)
+ store = new BlockManager(master, serializer, 1200)
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
val list4 = List(new Array[Byte](200), new Array[Byte](200))
// First store list1 and list2, both in memory, and list3, on disk only
- store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY)
- store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY)
- store.put("list3", list3.iterator, StorageLevel.DISK_ONLY)
- Thread.sleep(100)
+ store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, true)
+ store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, true)
+ store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, true)
// At this point LRU should not kick in because list3 is only on disk
assert(store.get("list1") != None, "list2 was not in store")
assert(store.get("list1").get.size === 2)
@@ -238,8 +311,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.get("list3") != None, "list1 was not in store")
assert(store.get("list3").get.size === 2)
// Now let's add in list4, which uses both disk and memory; list1 should drop out
- store.put("list4", list4.iterator, StorageLevel.DISK_AND_MEMORY)
- Thread.sleep(100)
+ store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, true)
assert(store.get("list1") === None, "list1 was in store")
assert(store.get("list2") != None, "list3 was not in store")
assert(store.get("list2").get.size === 2)
@@ -260,4 +332,70 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(stream.read() === -1, "end of stream not signalled")
assert(stream.read(temp, 0, temp.length) === -1, "end of stream not signalled")
}
+
+ test("overly large block") {
+ store = new BlockManager(master, serializer, 500)
+ store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
+ assert(store.getSingle("a1") === None, "a1 was in store")
+ store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK)
+ assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store")
+ assert(store.getSingle("a2") != None, "a2 was not in store")
+ }
+
+ test("block compression") {
+ try {
+ System.setProperty("spark.shuffle.compress", "true")
+ store = new BlockManager(master, serializer, 2000)
+ store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize("shuffle_0_0_0") <= 100, "shuffle_0_0_0 was not compressed")
+ store.stop()
+ store = null
+
+ System.setProperty("spark.shuffle.compress", "false")
+ store = new BlockManager(master, serializer, 2000)
+ store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize("shuffle_0_0_0") >= 1000, "shuffle_0_0_0 was compressed")
+ store.stop()
+ store = null
+
+ System.setProperty("spark.broadcast.compress", "true")
+ store = new BlockManager(master, serializer, 2000)
+ store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize("broadcast_0") <= 100, "broadcast_0 was not compressed")
+ store.stop()
+ store = null
+
+ System.setProperty("spark.broadcast.compress", "false")
+ store = new BlockManager(master, serializer, 2000)
+ store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize("broadcast_0") >= 1000, "broadcast_0 was compressed")
+ store.stop()
+ store = null
+
+ System.setProperty("spark.rdd.compress", "true")
+ store = new BlockManager(master, serializer, 2000)
+ store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize("rdd_0_0") <= 100, "rdd_0_0 was not compressed")
+ store.stop()
+ store = null
+
+ System.setProperty("spark.rdd.compress", "false")
+ store = new BlockManager(master, serializer, 2000)
+ store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize("rdd_0_0") >= 1000, "rdd_0_0 was compressed")
+ store.stop()
+ store = null
+
+ // Check that any other block types are also kept uncompressed
+ store = new BlockManager(master, serializer, 2000)
+ store.putSingle("other_block", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
+ assert(store.memoryStore.getSize("other_block") >= 1000, "other_block was compressed")
+ store.stop()
+ store = null
+ } finally {
+ System.clearProperty("spark.shuffle.compress")
+ System.clearProperty("spark.broadcast.compress")
+ System.clearProperty("spark.rdd.compress")
+ }
+ }
}