diff options
author | Prashant Sharma <scrapcodes@gmail.com> | 2013-09-15 10:55:12 +0530 |
---|---|---|
committer | Prashant Sharma <scrapcodes@gmail.com> | 2013-09-15 10:55:12 +0530 |
commit | 383e151fd7138cc6a143b3a38037cc3038c2a8b9 (patch) | |
tree | 0dbb2c0d8fdeff4c37a577eb96acb87ee7838a16 /core/src/test/scala/org | |
parent | 20c65bc334091d8d05fb680551155aa182d98f7d (diff) | |
parent | c4c1db2dd5b2ec0a8182369ecdb0e14f4e199822 (diff) | |
download | spark-383e151fd7138cc6a143b3a38037cc3038c2a8b9.tar.gz spark-383e151fd7138cc6a143b3a38037cc3038c2a8b9.tar.bz2 spark-383e151fd7138cc6a143b3a38037cc3038c2a8b9.zip |
Merge branch 'master' of git://github.com/mesos/spark into scala-2.10
Conflicts:
core/src/main/scala/org/apache/spark/SparkContext.scala
project/SparkBuild.scala
Diffstat (limited to 'core/src/test/scala/org')
7 files changed, 119 insertions, 25 deletions
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala new file mode 100644 index 0000000000..3a7171c488 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import scala.collection.mutable.ArrayBuffer + +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.mock.EasyMockSugar + +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.{BlockManager, StorageLevel} + +// TODO: Test the CacheManager's thread-safety aspects +class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar { + var sc : SparkContext = _ + var blockManager: BlockManager = _ + var cacheManager: CacheManager = _ + var split: Partition = _ + /** An RDD which returns the values [1, 2, 3, 4]. */ + var rdd: RDD[Int] = _ + + before { + sc = new SparkContext("local", "test") + blockManager = mock[BlockManager] + cacheManager = new CacheManager(blockManager) + split = new Partition { override def index: Int = 0 } + rdd = new RDD[Int](sc, Nil) { + override def getPartitions: Array[Partition] = Array(split) + override val getDependencies = List[Dependency[_]]() + override def compute(split: Partition, context: TaskContext) = Array(1, 2, 3, 4).iterator + } + } + + after { + sc.stop() + } + + test("get uncached rdd") { + expecting { + blockManager.get("rdd_0_0").andReturn(None) + blockManager.put("rdd_0_0", ArrayBuffer[Any](1, 2, 3, 4), StorageLevel.MEMORY_ONLY, true). + andReturn(0) + } + + whenExecuting(blockManager) { + val context = new TaskContext(0, 0, 0, runningLocally = false, null) + val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY) + assert(value.toList === List(1, 2, 3, 4)) + } + } + + test("get cached rdd") { + expecting { + blockManager.get("rdd_0_0").andReturn(Some(ArrayBuffer(5, 6, 7).iterator)) + } + + whenExecuting(blockManager) { + val context = new TaskContext(0, 0, 0, runningLocally = false, null) + val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY) + assert(value.toList === List(5, 6, 7)) + } + } + + test("get uncached local rdd") { + expecting { + // Local computation should not persist the resulting value, so don't expect a put(). + blockManager.get("rdd_0_0").andReturn(None) + } + + whenExecuting(blockManager) { + val context = new TaskContext(0, 0, 0, runningLocally = true, null) + val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY) + assert(value.toList === List(1, 2, 3, 4)) + } + } +} diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java index 8a869c9005..591c1d498d 100644 --- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java @@ -495,7 +495,7 @@ public class JavaAPISuite implements Serializable { @Test public void iterator() { JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2); - TaskContext context = new TaskContext(0, 0, 0, null); + TaskContext context = new TaskContext(0, 0, 0, false, null); Assert.assertEquals(1, rdd.iterator(rdd.splits().get(0), context).next().intValue()); } diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala index 58c94a162d..1a9ce8c607 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala @@ -30,14 +30,13 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter { val conf = new MetricsConfig(Option("dummy-file")) conf.initialize() - assert(conf.properties.size() === 5) + assert(conf.properties.size() === 4) assert(conf.properties.getProperty("test-for-dummy") === null) val property = conf.getInstance("random") - assert(property.size() === 3) + assert(property.size() === 2) assert(property.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet") - assert(property.getProperty("sink.servlet.uri") === "/metrics/json") - assert(property.getProperty("sink.servlet.sample") === "false") + assert(property.getProperty("sink.servlet.path") === "/metrics/json") } test("MetricsConfig with properties set") { @@ -45,22 +44,20 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter { conf.initialize() val masterProp = conf.getInstance("master") - assert(masterProp.size() === 6) + assert(masterProp.size() === 5) assert(masterProp.getProperty("sink.console.period") === "20") assert(masterProp.getProperty("sink.console.unit") === "minutes") assert(masterProp.getProperty("source.jvm.class") === "org.apache.spark.metrics.source.JvmSource") assert(masterProp.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet") - assert(masterProp.getProperty("sink.servlet.uri") === "/metrics/master/json") - assert(masterProp.getProperty("sink.servlet.sample") === "false") + assert(masterProp.getProperty("sink.servlet.path") === "/metrics/master/json") val workerProp = conf.getInstance("worker") - assert(workerProp.size() === 6) + assert(workerProp.size() === 5) assert(workerProp.getProperty("sink.console.period") === "10") assert(workerProp.getProperty("sink.console.unit") === "seconds") assert(workerProp.getProperty("source.jvm.class") === "org.apache.spark.metrics.source.JvmSource") assert(workerProp.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet") - assert(workerProp.getProperty("sink.servlet.uri") === "/metrics/json") - assert(workerProp.getProperty("sink.servlet.sample") === "false") + assert(workerProp.getProperty("sink.servlet.path") === "/metrics/json") } test("MetricsConfig with subProperties") { @@ -84,6 +81,6 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter { assert(consoleProps.size() === 2) val servletProps = sinkProps("servlet") - assert(servletProps.size() === 3) + assert(servletProps.size() === 2) } } diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index adc971050e..c1df5e151e 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -140,7 +140,7 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(rdd.union(emptyKv).collect().size === 2) } - test("cogrouped RDDs") { + test("coalesced RDDs") { val data = sc.parallelize(1 to 10, 10) val coalesced1 = data.coalesce(2) @@ -175,8 +175,14 @@ class RDDSuite extends FunSuite with SharedSparkContext { val coalesced5 = data.coalesce(1, shuffle = true) assert(coalesced5.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[ShuffledRDD[_, _, _]] != null) + + // when shuffling, we can increase the number of partitions + val coalesced6 = data.coalesce(20, shuffle = true) + assert(coalesced6.partitions.size === 20) + assert(coalesced6.collect().toSet === (1 to 10).toSet) } - test("cogrouped RDDs with locality") { + + test("coalesced RDDs with locality") { val data3 = sc.makeRDD(List((1,List("a","c")), (2,List("a","b","c")), (3,List("b")))) val coal3 = data3.coalesce(3) val list3 = coal3.partitions.map(p => p.asInstanceOf[CoalescedRDDPartition].preferredLocation) @@ -197,11 +203,11 @@ class RDDSuite extends FunSuite with SharedSparkContext { val coalesced4 = data.coalesce(20) val listOfLists = coalesced4.glom().collect().map(_.toList).toList val sortedList = listOfLists.sortWith{ (x, y) => !x.isEmpty && (y.isEmpty || (x(0) < y(0))) } - assert( sortedList === (1 to 9). + assert(sortedList === (1 to 9). map{x => List(x)}.toList, "Tried coalescing 9 partitions to 20 but didn't get 9 back") } - test("cogrouped RDDs with locality, large scale (10K partitions)") { + test("coalesced RDDs with locality, large scale (10K partitions)") { // large scale experiment import collection.mutable val rnd = scala.util.Random diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala index 92ad9f09b2..1b50ce06b3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala @@ -166,7 +166,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging val taskSet = new TaskSet(tasks.toArray,0,0,0,null) val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile() - System.setProperty("spark.fairscheduler.allocation.file", xmlPath) + System.setProperty("spark.scheduler.allocation.file", xmlPath) val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) val schedulableBuilder = new FairSchedulableBuilder(rootPool) schedulableBuilder.buildPools() @@ -179,13 +179,13 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging assert(rootPool.getSchedulableByName("1").weight === 1) assert(rootPool.getSchedulableByName("2").minShare === 3) assert(rootPool.getSchedulableByName("2").weight === 1) - assert(rootPool.getSchedulableByName("3").minShare === 2) + assert(rootPool.getSchedulableByName("3").minShare === 0) assert(rootPool.getSchedulableByName("3").weight === 1) val properties1 = new Properties() - properties1.setProperty("spark.scheduler.cluster.fair.pool","1") + properties1.setProperty("spark.scheduler.pool","1") val properties2 = new Properties() - properties2.setProperty("spark.scheduler.cluster.fair.pool","2") + properties2.setProperty("spark.scheduler.pool","2") val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, clusterScheduler, taskSet) val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, clusterScheduler, taskSet) diff --git a/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala index 111340a65c..af76c843e8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala @@ -73,7 +73,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext { TaskThreadInfo.threadToStarted(threadIndex) = new CountDownLatch(1) new Thread { if (poolName != null) { - sc.setLocalProperty("spark.scheduler.cluster.fair.pool", poolName) + sc.setLocalProperty("spark.scheduler.pool", poolName) } override def run() { val ans = nums.map(number => { @@ -90,7 +90,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext { } test("Local FIFO scheduler end-to-end test") { - System.setProperty("spark.cluster.schedulingmode", "FIFO") + System.setProperty("spark.scheduler.mode", "FIFO") sc = new SparkContext("local[4]", "test") val sem = new Semaphore(0) @@ -150,9 +150,9 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext { test("Local fair scheduler end-to-end test") { sc = new SparkContext("local[8]", "LocalSchedulerSuite") val sem = new Semaphore(0) - System.setProperty("spark.cluster.schedulingmode", "FAIR") + System.setProperty("spark.scheduler.mode", "FAIR") val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile() - System.setProperty("spark.fairscheduler.allocation.file", xmlPath) + System.setProperty("spark.scheduler.allocation.file", xmlPath) createThread(10,"1",sc,sem) TaskThreadInfo.threadToStarted(10).await() diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 3321fb5eb7..07c9f2382b 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -24,7 +24,7 @@ import org.eclipse.jetty.server.Server class UISuite extends FunSuite { test("jetty port increases under contention") { - val startPort = 3030 + val startPort = 4040 val server = new Server(startPort) server.start() val (jettyServer1, boundPort1) = JettyUtils.startJettyServer("localhost", startPort, Seq()) |