aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-09-29 20:21:54 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-09-29 20:21:54 -0700
commit9b326d01e9a9ec4a4a9abf293cf039c07d426293 (patch)
tree67283c4ae24bf48014715a19129c60833280c389 /core/src/test/scala
parent56dcad593641ef8de211fcb4303574a9f4509f89 (diff)
downloadspark-9b326d01e9a9ec4a4a9abf293cf039c07d426293.tar.gz
spark-9b326d01e9a9ec4a4a9abf293cf039c07d426293.tar.bz2
spark-9b326d01e9a9ec4a4a9abf293cf039c07d426293.zip
Made BlockManager unmap memory-mapped files when necessary to reduce the
number of open files. Also optimized sending of disk-based blocks.
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/spark/DistributedSuite.scala60
1 files changed, 58 insertions, 2 deletions
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
index 93b876d205..fce1deaa5c 100644
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ b/core/src/test/scala/spark/DistributedSuite.scala
@@ -13,6 +13,7 @@ import com.google.common.io.Files
import scala.collection.mutable.ArrayBuffer
import SparkContext._
+import storage.StorageLevel
class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
@@ -26,7 +27,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
sc = null
}
}
-
+
test("simple groupByKey") {
sc = new SparkContext(clusterUrl, "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 5)
@@ -64,5 +65,60 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
assert(thrown.getClass === classOf[SparkException])
assert(thrown.getMessage.contains("more than 4 times"))
}
-}
+ test("caching") {
+ sc = new SparkContext(clusterUrl, "test")
+ val data = sc.parallelize(1 to 1000, 10).cache()
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ }
+
+ test("caching on disk") {
+ sc = new SparkContext(clusterUrl, "test")
+ val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.DISK_ONLY)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ }
+
+ test("caching in memory, replicated") {
+ sc = new SparkContext(clusterUrl, "test")
+ val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_2)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ }
+
+ test("caching in memory, serialized, replicated") {
+ sc = new SparkContext(clusterUrl, "test")
+ val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_SER_2)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ }
+
+ test("caching on disk, replicated") {
+ sc = new SparkContext(clusterUrl, "test")
+ val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.DISK_ONLY_2)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ }
+
+ test("caching in memory and disk, replicated") {
+ sc = new SparkContext(clusterUrl, "test")
+ val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_AND_DISK_2)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ }
+
+ test("caching in memory and disk, serialized, replicated") {
+ sc = new SparkContext(clusterUrl, "test")
+ val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_AND_DISK_SER_2)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ }
+}