diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-09-29 20:21:54 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-09-29 20:21:54 -0700 |
commit | 9b326d01e9a9ec4a4a9abf293cf039c07d426293 (patch) | |
tree | 67283c4ae24bf48014715a19129c60833280c389 /core/src/test/scala | |
parent | 56dcad593641ef8de211fcb4303574a9f4509f89 (diff) | |
download | spark-9b326d01e9a9ec4a4a9abf293cf039c07d426293.tar.gz spark-9b326d01e9a9ec4a4a9abf293cf039c07d426293.tar.bz2 spark-9b326d01e9a9ec4a4a9abf293cf039c07d426293.zip |
Made BlockManager unmap memory-mapped files when necessary to reduce the
number of open files. Also optimized sending of disk-based blocks.
Diffstat (limited to 'core/src/test/scala')
-rw-r--r-- | core/src/test/scala/spark/DistributedSuite.scala | 60 |
1 files changed, 58 insertions, 2 deletions
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala index 93b876d205..fce1deaa5c 100644 --- a/core/src/test/scala/spark/DistributedSuite.scala +++ b/core/src/test/scala/spark/DistributedSuite.scala @@ -13,6 +13,7 @@ import com.google.common.io.Files import scala.collection.mutable.ArrayBuffer import SparkContext._ +import storage.StorageLevel class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { @@ -26,7 +27,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter sc = null } } - + test("simple groupByKey") { sc = new SparkContext(clusterUrl, "test") val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 5) @@ -64,5 +65,60 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter assert(thrown.getClass === classOf[SparkException]) assert(thrown.getMessage.contains("more than 4 times")) } -} + test("caching") { + sc = new SparkContext(clusterUrl, "test") + val data = sc.parallelize(1 to 1000, 10).cache() + assert(data.count() === 1000) + assert(data.count() === 1000) + assert(data.count() === 1000) + } + + test("caching on disk") { + sc = new SparkContext(clusterUrl, "test") + val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.DISK_ONLY) + assert(data.count() === 1000) + assert(data.count() === 1000) + assert(data.count() === 1000) + } + + test("caching in memory, replicated") { + sc = new SparkContext(clusterUrl, "test") + val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_2) + assert(data.count() === 1000) + assert(data.count() === 1000) + assert(data.count() === 1000) + } + + test("caching in memory, serialized, replicated") { + sc = new SparkContext(clusterUrl, "test") + val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_SER_2) + assert(data.count() === 1000) + assert(data.count() === 1000) + assert(data.count() === 1000) + } + + test("caching on disk, replicated") { + sc = new SparkContext(clusterUrl, "test") + val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.DISK_ONLY_2) + assert(data.count() === 1000) + assert(data.count() === 1000) + assert(data.count() === 1000) + } + + test("caching in memory and disk, replicated") { + sc = new SparkContext(clusterUrl, "test") + val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_AND_DISK_2) + assert(data.count() === 1000) + assert(data.count() === 1000) + assert(data.count() === 1000) + } + + test("caching in memory and disk, serialized, replicated") { + sc = new SparkContext(clusterUrl, "test") + val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_AND_DISK_SER_2) + assert(data.count() === 1000) + assert(data.count() === 1000) + assert(data.count() === 1000) + } +} |