diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/pom.xml | 10 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/CacheManagerSuite.scala | 42 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala | 125 |
3 files changed, 80 insertions, 97 deletions
diff --git a/core/pom.xml b/core/pom.xml index c86d118c1c..aca0f58865 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -330,16 +330,6 @@ <scope>test</scope> </dependency> <dependency> - <groupId>org.easymock</groupId> - <artifactId>easymockclassextension</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>asm</groupId> - <artifactId>asm</artifactId> - <scope>test</scope> - </dependency> - <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala index d7d9dc7b50..4b25c200a6 100644 --- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala @@ -17,16 +17,18 @@ package org.apache.spark +import org.mockito.Mockito._ import org.scalatest.{BeforeAndAfter, FunSuite} -import org.scalatest.mock.EasyMockSugar +import org.scalatest.mock.MockitoSugar -import org.apache.spark.executor.{DataReadMethod, TaskMetrics} +import org.apache.spark.executor.DataReadMethod import org.apache.spark.rdd.RDD import org.apache.spark.storage._ // TODO: Test the CacheManager's thread-safety aspects -class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar { - var sc : SparkContext = _ +class CacheManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAfter + with MockitoSugar { + var blockManager: BlockManager = _ var cacheManager: CacheManager = _ var split: Partition = _ @@ -57,10 +59,6 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar }.cache() } - after { - sc.stop() - } - test("get uncached rdd") { // Do not mock this test, because attempting to match Array[Any], which is not covariant, // in blockManager.put is a losing battle. You have been warned. @@ -75,29 +73,21 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar } test("get cached rdd") { - expecting { - val result = new BlockResult(Array(5, 6, 7).iterator, DataReadMethod.Memory, 12) - blockManager.get(RDDBlockId(0, 0)).andReturn(Some(result)) - } + val result = new BlockResult(Array(5, 6, 7).iterator, DataReadMethod.Memory, 12) + when(blockManager.get(RDDBlockId(0, 0))).thenReturn(Some(result)) - whenExecuting(blockManager) { - val context = new TaskContextImpl(0, 0, 0, 0) - val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY) - assert(value.toList === List(5, 6, 7)) - } + val context = new TaskContextImpl(0, 0, 0, 0) + val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY) + assert(value.toList === List(5, 6, 7)) } test("get uncached local rdd") { - expecting { - // Local computation should not persist the resulting value, so don't expect a put(). - blockManager.get(RDDBlockId(0, 0)).andReturn(None) - } + // Local computation should not persist the resulting value, so don't expect a put(). + when(blockManager.get(RDDBlockId(0, 0))).thenReturn(None) - whenExecuting(blockManager) { - val context = new TaskContextImpl(0, 0, 0, 0, true) - val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY) - assert(value.toList === List(1, 2, 3, 4)) - } + val context = new TaskContextImpl(0, 0, 0, 0, true) + val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY) + assert(value.toList === List(1, 2, 3, 4)) } test("verify task metrics updated correctly") { diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala index 46ab02bfef..8cd302e2b4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala @@ -17,45 +17,47 @@ package org.apache.spark.scheduler.mesos -import org.apache.spark.executor.MesosExecutorBackend -import org.scalatest.FunSuite -import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext} -import org.apache.spark.scheduler.{SparkListenerExecutorAdded, LiveListenerBus, - TaskDescription, WorkerOffer, TaskSchedulerImpl} -import org.apache.spark.scheduler.cluster.ExecutorInfo -import org.apache.spark.scheduler.cluster.mesos.{MemoryUtils, MesosSchedulerBackend} -import org.apache.mesos.SchedulerDriver -import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, _} -import org.apache.mesos.Protos.Value.Scalar -import org.easymock.{Capture, EasyMock} import java.nio.ByteBuffer -import java.util.Collections import java.util -import org.scalatest.mock.EasyMockSugar +import java.util.Collections import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with EasyMockSugar { +import org.apache.mesos.SchedulerDriver +import org.apache.mesos.Protos._ +import org.apache.mesos.Protos.Value.Scalar +import org.mockito.Mockito._ +import org.mockito.Matchers._ +import org.mockito.{ArgumentCaptor, Matchers} +import org.scalatest.FunSuite +import org.scalatest.mock.MockitoSugar + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext} +import org.apache.spark.executor.MesosExecutorBackend +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.ExecutorInfo +import org.apache.spark.scheduler.cluster.mesos.{MesosSchedulerBackend, MemoryUtils} + +class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with MockitoSugar { test("check spark-class location correctly") { val conf = new SparkConf conf.set("spark.mesos.executor.home" , "/mesos-home") - val listenerBus = EasyMock.createMock(classOf[LiveListenerBus]) - listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) - EasyMock.replay(listenerBus) - - val sc = EasyMock.createMock(classOf[SparkContext]) - EasyMock.expect(sc.getSparkHome()).andReturn(Option("/spark-home")).anyTimes() - EasyMock.expect(sc.conf).andReturn(conf).anyTimes() - EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes() - EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes() - EasyMock.expect(sc.listenerBus).andReturn(listenerBus) - EasyMock.replay(sc) - val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl]) - EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes() - EasyMock.replay(taskScheduler) + val listenerBus = mock[LiveListenerBus] + listenerBus.post( + SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) + + val sc = mock[SparkContext] + when(sc.getSparkHome()).thenReturn(Option("/spark-home")) + + when(sc.conf).thenReturn(conf) + when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String]) + when(sc.executorMemory).thenReturn(100) + when(sc.listenerBus).thenReturn(listenerBus) + val taskScheduler = mock[TaskSchedulerImpl] + when(taskScheduler.CPUS_PER_TASK).thenReturn(2) val mesosSchedulerBackend = new MesosSchedulerBackend(taskScheduler, sc, "master") @@ -84,20 +86,19 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")).setHostname(s"host${id.toString}").build() } - val driver = EasyMock.createMock(classOf[SchedulerDriver]) - val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl]) + val driver = mock[SchedulerDriver] + val taskScheduler = mock[TaskSchedulerImpl] - val listenerBus = EasyMock.createMock(classOf[LiveListenerBus]) - listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) - EasyMock.replay(listenerBus) + val listenerBus = mock[LiveListenerBus] + listenerBus.post( + SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) - val sc = EasyMock.createMock(classOf[SparkContext]) - EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes() - EasyMock.expect(sc.getSparkHome()).andReturn(Option("/path")).anyTimes() - EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes() - EasyMock.expect(sc.conf).andReturn(new SparkConf).anyTimes() - EasyMock.expect(sc.listenerBus).andReturn(listenerBus) - EasyMock.replay(sc) + val sc = mock[SparkContext] + when(sc.executorMemory).thenReturn(100) + when(sc.getSparkHome()).thenReturn(Option("/path")) + when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String]) + when(sc.conf).thenReturn(new SparkConf) + when(sc.listenerBus).thenReturn(listenerBus) val minMem = MemoryUtils.calculateTotalMemory(sc).toInt val minCpu = 4 @@ -121,25 +122,29 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea 2 )) val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))) - EasyMock.expect(taskScheduler.resourceOffers(EasyMock.eq(expectedWorkerOffers))).andReturn(Seq(Seq(taskDesc))) - EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes() - EasyMock.replay(taskScheduler) + when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc))) + when(taskScheduler.CPUS_PER_TASK).thenReturn(2) - val capture = new Capture[util.Collection[TaskInfo]] - EasyMock.expect( + val capture = ArgumentCaptor.forClass(classOf[util.Collection[TaskInfo]]) + when( driver.launchTasks( - EasyMock.eq(Collections.singleton(mesosOffers.get(0).getId)), - EasyMock.capture(capture), - EasyMock.anyObject(classOf[Filters]) + Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)), + capture.capture(), + any(classOf[Filters]) ) - ).andReturn(Status.valueOf(1)).once - EasyMock.expect(driver.declineOffer(mesosOffers.get(1).getId)).andReturn(Status.valueOf(1)).times(1) - EasyMock.expect(driver.declineOffer(mesosOffers.get(2).getId)).andReturn(Status.valueOf(1)).times(1) - EasyMock.replay(driver) + ).thenReturn(Status.valueOf(1)) + when(driver.declineOffer(mesosOffers.get(1).getId)).thenReturn(Status.valueOf(1)) + when(driver.declineOffer(mesosOffers.get(2).getId)).thenReturn(Status.valueOf(1)) backend.resourceOffers(driver, mesosOffers) - EasyMock.verify(driver) + verify(driver, times(1)).launchTasks( + Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)), + capture.capture(), + any(classOf[Filters]) + ) + verify(driver, times(1)).declineOffer(mesosOffers.get(1).getId) + verify(driver, times(1)).declineOffer(mesosOffers.get(2).getId) assert(capture.getValue.size() == 1) val taskInfo = capture.getValue.iterator().next() assert(taskInfo.getName.equals("n1")) @@ -151,15 +156,13 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea // Unwanted resources offered on an existing node. Make sure they are declined val mesosOffers2 = new java.util.ArrayList[Offer] mesosOffers2.add(createOffer(1, minMem, minCpu)) - EasyMock.reset(taskScheduler) - EasyMock.reset(driver) - EasyMock.expect(taskScheduler.resourceOffers(EasyMock.anyObject(classOf[Seq[WorkerOffer]])).andReturn(Seq(Seq()))) - EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes() - EasyMock.replay(taskScheduler) - EasyMock.expect(driver.declineOffer(mesosOffers2.get(0).getId)).andReturn(Status.valueOf(1)).times(1) - EasyMock.replay(driver) + reset(taskScheduler) + reset(driver) + when(taskScheduler.resourceOffers(any(classOf[Seq[WorkerOffer]]))).thenReturn(Seq(Seq())) + when(taskScheduler.CPUS_PER_TASK).thenReturn(2) + when(driver.declineOffer(mesosOffers2.get(0).getId)).thenReturn(Status.valueOf(1)) backend.resourceOffers(driver, mesosOffers2) - EasyMock.verify(driver) + verify(driver, times(1)).declineOffer(mesosOffers2.get(0).getId) } } |