aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-02-13 09:53:57 -0800
committerAndrew Or <andrew@databricks.com>2015-02-13 09:55:36 -0800
commit077eec2d9dba197f51004ee4a322d0fa71424ea0 (patch)
treeaad94c050180d590c93e85cd6d691f78a27f9dd3
parentfc6d3e796a3c600e2f7827562455d555e59775ae (diff)
downloadspark-077eec2d9dba197f51004ee4a322d0fa71424ea0.tar.gz
spark-077eec2d9dba197f51004ee4a322d0fa71424ea0.tar.bz2
spark-077eec2d9dba197f51004ee4a322d0fa71424ea0.zip
[SPARK-5735] Replace uses of EasyMock with Mockito
This patch replaces all uses of EasyMock with Mockito. There are two motivations for this: 1. We should use a single mocking framework in our tests in order to keep things consistent. 2. EasyMock may be responsible for non-deterministic unit test failures due to its Objensis dependency (see SPARK-5626 for more details). Most of these changes are fairly mechanical translations of Mockito code to EasyMock, although I made a small change that strengthens the assertions in one test in KinesisReceiverSuite. Author: Josh Rosen <joshrosen@databricks.com> Closes #4578 from JoshRosen/SPARK-5735-remove-easymock and squashes the following commits: 0ab192b [Josh Rosen] Import sorting plus two minor changes to more closely match old semantics. 977565b [Josh Rosen] Remove EasyMock from build. fae1d8f [Josh Rosen] Remove EasyMock usage in KinesisReceiverSuite. 7cca486 [Josh Rosen] Remove EasyMock usage in MesosSchedulerBackendSuite fc5e94d [Josh Rosen] Remove EasyMock in CacheManagerSuite
-rw-r--r--core/pom.xml10
-rw-r--r--core/src/test/scala/org/apache/spark/CacheManagerSuite.scala42
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala125
-rw-r--r--extras/kinesis-asl/pom.xml5
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala263
-rw-r--r--pom.xml13
6 files changed, 207 insertions, 251 deletions
diff --git a/core/pom.xml b/core/pom.xml
index 66180035e6..c993781c0e 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -330,16 +330,6 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.easymock</groupId>
- <artifactId>easymockclassextension</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>asm</groupId>
- <artifactId>asm</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
index d7d9dc7b50..4b25c200a6 100644
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
@@ -17,16 +17,18 @@
package org.apache.spark
+import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfter, FunSuite}
-import org.scalatest.mock.EasyMockSugar
+import org.scalatest.mock.MockitoSugar
-import org.apache.spark.executor.{DataReadMethod, TaskMetrics}
+import org.apache.spark.executor.DataReadMethod
import org.apache.spark.rdd.RDD
import org.apache.spark.storage._
// TODO: Test the CacheManager's thread-safety aspects
-class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar {
- var sc : SparkContext = _
+class CacheManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAfter
+ with MockitoSugar {
+
var blockManager: BlockManager = _
var cacheManager: CacheManager = _
var split: Partition = _
@@ -57,10 +59,6 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
}.cache()
}
- after {
- sc.stop()
- }
-
test("get uncached rdd") {
// Do not mock this test, because attempting to match Array[Any], which is not covariant,
// in blockManager.put is a losing battle. You have been warned.
@@ -75,29 +73,21 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
}
test("get cached rdd") {
- expecting {
- val result = new BlockResult(Array(5, 6, 7).iterator, DataReadMethod.Memory, 12)
- blockManager.get(RDDBlockId(0, 0)).andReturn(Some(result))
- }
+ val result = new BlockResult(Array(5, 6, 7).iterator, DataReadMethod.Memory, 12)
+ when(blockManager.get(RDDBlockId(0, 0))).thenReturn(Some(result))
- whenExecuting(blockManager) {
- val context = new TaskContextImpl(0, 0, 0, 0)
- val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
- assert(value.toList === List(5, 6, 7))
- }
+ val context = new TaskContextImpl(0, 0, 0, 0)
+ val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
+ assert(value.toList === List(5, 6, 7))
}
test("get uncached local rdd") {
- expecting {
- // Local computation should not persist the resulting value, so don't expect a put().
- blockManager.get(RDDBlockId(0, 0)).andReturn(None)
- }
+ // Local computation should not persist the resulting value, so don't expect a put().
+ when(blockManager.get(RDDBlockId(0, 0))).thenReturn(None)
- whenExecuting(blockManager) {
- val context = new TaskContextImpl(0, 0, 0, 0, true)
- val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
- assert(value.toList === List(1, 2, 3, 4))
- }
+ val context = new TaskContextImpl(0, 0, 0, 0, true)
+ val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
+ assert(value.toList === List(1, 2, 3, 4))
}
test("verify task metrics updated correctly") {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
index 46ab02bfef..8cd302e2b4 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
@@ -17,45 +17,47 @@
package org.apache.spark.scheduler.mesos
-import org.apache.spark.executor.MesosExecutorBackend
-import org.scalatest.FunSuite
-import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext}
-import org.apache.spark.scheduler.{SparkListenerExecutorAdded, LiveListenerBus,
- TaskDescription, WorkerOffer, TaskSchedulerImpl}
-import org.apache.spark.scheduler.cluster.ExecutorInfo
-import org.apache.spark.scheduler.cluster.mesos.{MemoryUtils, MesosSchedulerBackend}
-import org.apache.mesos.SchedulerDriver
-import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, _}
-import org.apache.mesos.Protos.Value.Scalar
-import org.easymock.{Capture, EasyMock}
import java.nio.ByteBuffer
-import java.util.Collections
import java.util
-import org.scalatest.mock.EasyMockSugar
+import java.util.Collections
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
-class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with EasyMockSugar {
+import org.apache.mesos.SchedulerDriver
+import org.apache.mesos.Protos._
+import org.apache.mesos.Protos.Value.Scalar
+import org.mockito.Mockito._
+import org.mockito.Matchers._
+import org.mockito.{ArgumentCaptor, Matchers}
+import org.scalatest.FunSuite
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext}
+import org.apache.spark.executor.MesosExecutorBackend
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.ExecutorInfo
+import org.apache.spark.scheduler.cluster.mesos.{MesosSchedulerBackend, MemoryUtils}
+
+class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with MockitoSugar {
test("check spark-class location correctly") {
val conf = new SparkConf
conf.set("spark.mesos.executor.home" , "/mesos-home")
- val listenerBus = EasyMock.createMock(classOf[LiveListenerBus])
- listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
- EasyMock.replay(listenerBus)
-
- val sc = EasyMock.createMock(classOf[SparkContext])
- EasyMock.expect(sc.getSparkHome()).andReturn(Option("/spark-home")).anyTimes()
- EasyMock.expect(sc.conf).andReturn(conf).anyTimes()
- EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes()
- EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes()
- EasyMock.expect(sc.listenerBus).andReturn(listenerBus)
- EasyMock.replay(sc)
- val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl])
- EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes()
- EasyMock.replay(taskScheduler)
+ val listenerBus = mock[LiveListenerBus]
+ listenerBus.post(
+ SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
+
+ val sc = mock[SparkContext]
+ when(sc.getSparkHome()).thenReturn(Option("/spark-home"))
+
+ when(sc.conf).thenReturn(conf)
+ when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
+ when(sc.executorMemory).thenReturn(100)
+ when(sc.listenerBus).thenReturn(listenerBus)
+ val taskScheduler = mock[TaskSchedulerImpl]
+ when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
val mesosSchedulerBackend = new MesosSchedulerBackend(taskScheduler, sc, "master")
@@ -84,20 +86,19 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
.setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")).setHostname(s"host${id.toString}").build()
}
- val driver = EasyMock.createMock(classOf[SchedulerDriver])
- val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl])
+ val driver = mock[SchedulerDriver]
+ val taskScheduler = mock[TaskSchedulerImpl]
- val listenerBus = EasyMock.createMock(classOf[LiveListenerBus])
- listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
- EasyMock.replay(listenerBus)
+ val listenerBus = mock[LiveListenerBus]
+ listenerBus.post(
+ SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
- val sc = EasyMock.createMock(classOf[SparkContext])
- EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes()
- EasyMock.expect(sc.getSparkHome()).andReturn(Option("/path")).anyTimes()
- EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes()
- EasyMock.expect(sc.conf).andReturn(new SparkConf).anyTimes()
- EasyMock.expect(sc.listenerBus).andReturn(listenerBus)
- EasyMock.replay(sc)
+ val sc = mock[SparkContext]
+ when(sc.executorMemory).thenReturn(100)
+ when(sc.getSparkHome()).thenReturn(Option("/path"))
+ when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
+ when(sc.conf).thenReturn(new SparkConf)
+ when(sc.listenerBus).thenReturn(listenerBus)
val minMem = MemoryUtils.calculateTotalMemory(sc).toInt
val minCpu = 4
@@ -121,25 +122,29 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
2
))
val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
- EasyMock.expect(taskScheduler.resourceOffers(EasyMock.eq(expectedWorkerOffers))).andReturn(Seq(Seq(taskDesc)))
- EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes()
- EasyMock.replay(taskScheduler)
+ when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
+ when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
- val capture = new Capture[util.Collection[TaskInfo]]
- EasyMock.expect(
+ val capture = ArgumentCaptor.forClass(classOf[util.Collection[TaskInfo]])
+ when(
driver.launchTasks(
- EasyMock.eq(Collections.singleton(mesosOffers.get(0).getId)),
- EasyMock.capture(capture),
- EasyMock.anyObject(classOf[Filters])
+ Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
+ capture.capture(),
+ any(classOf[Filters])
)
- ).andReturn(Status.valueOf(1)).once
- EasyMock.expect(driver.declineOffer(mesosOffers.get(1).getId)).andReturn(Status.valueOf(1)).times(1)
- EasyMock.expect(driver.declineOffer(mesosOffers.get(2).getId)).andReturn(Status.valueOf(1)).times(1)
- EasyMock.replay(driver)
+ ).thenReturn(Status.valueOf(1))
+ when(driver.declineOffer(mesosOffers.get(1).getId)).thenReturn(Status.valueOf(1))
+ when(driver.declineOffer(mesosOffers.get(2).getId)).thenReturn(Status.valueOf(1))
backend.resourceOffers(driver, mesosOffers)
- EasyMock.verify(driver)
+ verify(driver, times(1)).launchTasks(
+ Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
+ capture.capture(),
+ any(classOf[Filters])
+ )
+ verify(driver, times(1)).declineOffer(mesosOffers.get(1).getId)
+ verify(driver, times(1)).declineOffer(mesosOffers.get(2).getId)
assert(capture.getValue.size() == 1)
val taskInfo = capture.getValue.iterator().next()
assert(taskInfo.getName.equals("n1"))
@@ -151,15 +156,13 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
// Unwanted resources offered on an existing node. Make sure they are declined
val mesosOffers2 = new java.util.ArrayList[Offer]
mesosOffers2.add(createOffer(1, minMem, minCpu))
- EasyMock.reset(taskScheduler)
- EasyMock.reset(driver)
- EasyMock.expect(taskScheduler.resourceOffers(EasyMock.anyObject(classOf[Seq[WorkerOffer]])).andReturn(Seq(Seq())))
- EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes()
- EasyMock.replay(taskScheduler)
- EasyMock.expect(driver.declineOffer(mesosOffers2.get(0).getId)).andReturn(Status.valueOf(1)).times(1)
- EasyMock.replay(driver)
+ reset(taskScheduler)
+ reset(driver)
+ when(taskScheduler.resourceOffers(any(classOf[Seq[WorkerOffer]]))).thenReturn(Seq(Seq()))
+ when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
+ when(driver.declineOffer(mesosOffers2.get(0).getId)).thenReturn(Status.valueOf(1))
backend.resourceOffers(driver, mesosOffers2)
- EasyMock.verify(driver)
+ verify(driver, times(1)).declineOffer(mesosOffers2.get(0).getId)
}
}
diff --git a/extras/kinesis-asl/pom.xml b/extras/kinesis-asl/pom.xml
index c815eda52b..216661b8bc 100644
--- a/extras/kinesis-asl/pom.xml
+++ b/extras/kinesis-asl/pom.xml
@@ -68,11 +68,6 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.easymock</groupId>
- <artifactId>easymockclassextension</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>com.novocode</groupId>
<artifactId>junit-interface</artifactId>
<scope>test</scope>
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
index 41dbd64c2b..f56898af02 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
@@ -20,7 +20,6 @@ import java.nio.ByteBuffer
import scala.collection.JavaConversions.seqAsJavaList
-import org.apache.spark.annotation.Experimental
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Milliseconds
import org.apache.spark.streaming.Seconds
@@ -28,9 +27,11 @@ import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.TestSuiteBase
import org.apache.spark.streaming.util.Clock
import org.apache.spark.streaming.util.ManualClock
+
+import org.mockito.Mockito._
import org.scalatest.BeforeAndAfter
import org.scalatest.Matchers
-import org.scalatest.mock.EasyMockSugar
+import org.scalatest.mock.MockitoSugar
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException
@@ -42,10 +43,10 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
import com.amazonaws.services.kinesis.model.Record
/**
- * Suite of Kinesis streaming receiver tests focusing mostly on the KinesisRecordProcessor
+ * Suite of Kinesis streaming receiver tests focusing mostly on the KinesisRecordProcessor
*/
class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAfter
- with EasyMockSugar {
+ with MockitoSugar {
val app = "TestKinesisReceiver"
val stream = "mySparkStream"
@@ -73,6 +74,14 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
currentClockMock = mock[Clock]
}
+ override def afterFunction(): Unit = {
+ super.afterFunction()
+ // Since this suite was originally written using EasyMock, add this to preserve the old
+ // mocking semantics (see SPARK-5735 for more details)
+ verifyNoMoreInteractions(receiverMock, checkpointerMock, checkpointClockMock,
+ checkpointStateMock, currentClockMock)
+ }
+
test("kinesis utils api") {
val ssc = new StreamingContext(master, framework, batchDuration)
// Tests the API, does not actually test data receiving
@@ -83,193 +92,175 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
}
test("process records including store and checkpoint") {
- val expectedCheckpointIntervalMillis = 10
- expecting {
- receiverMock.isStopped().andReturn(false).once()
- receiverMock.store(record1.getData().array()).once()
- receiverMock.store(record2.getData().array()).once()
- checkpointStateMock.shouldCheckpoint().andReturn(true).once()
- checkpointerMock.checkpoint().once()
- checkpointStateMock.advanceCheckpoint().once()
- }
- whenExecuting(receiverMock, checkpointerMock, checkpointStateMock) {
- val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId,
- checkpointStateMock)
- recordProcessor.processRecords(batch, checkpointerMock)
- }
+ when(receiverMock.isStopped()).thenReturn(false)
+ when(checkpointStateMock.shouldCheckpoint()).thenReturn(true)
+
+ val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock)
+ recordProcessor.processRecords(batch, checkpointerMock)
+
+ verify(receiverMock, times(1)).isStopped()
+ verify(receiverMock, times(1)).store(record1.getData().array())
+ verify(receiverMock, times(1)).store(record2.getData().array())
+ verify(checkpointStateMock, times(1)).shouldCheckpoint()
+ verify(checkpointerMock, times(1)).checkpoint()
+ verify(checkpointStateMock, times(1)).advanceCheckpoint()
}
test("shouldn't store and checkpoint when receiver is stopped") {
- expecting {
- receiverMock.isStopped().andReturn(true).once()
- }
- whenExecuting(receiverMock, checkpointerMock, checkpointStateMock) {
- val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId,
- checkpointStateMock)
- recordProcessor.processRecords(batch, checkpointerMock)
- }
+ when(receiverMock.isStopped()).thenReturn(true)
+
+ val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock)
+ recordProcessor.processRecords(batch, checkpointerMock)
+
+ verify(receiverMock, times(1)).isStopped()
}
test("shouldn't checkpoint when exception occurs during store") {
- expecting {
- receiverMock.isStopped().andReturn(false).once()
- receiverMock.store(record1.getData().array()).andThrow(new RuntimeException()).once()
- }
- whenExecuting(receiverMock, checkpointerMock, checkpointStateMock) {
- intercept[RuntimeException] {
- val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId,
- checkpointStateMock)
- recordProcessor.processRecords(batch, checkpointerMock)
- }
+ when(receiverMock.isStopped()).thenReturn(false)
+ when(receiverMock.store(record1.getData().array())).thenThrow(new RuntimeException())
+
+ intercept[RuntimeException] {
+ val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock)
+ recordProcessor.processRecords(batch, checkpointerMock)
}
+
+ verify(receiverMock, times(1)).isStopped()
+ verify(receiverMock, times(1)).store(record1.getData().array())
}
test("should set checkpoint time to currentTime + checkpoint interval upon instantiation") {
- expecting {
- currentClockMock.currentTime().andReturn(0).once()
- }
- whenExecuting(currentClockMock) {
+ when(currentClockMock.currentTime()).thenReturn(0)
+
val checkpointIntervalMillis = 10
- val checkpointState = new KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), currentClockMock)
+ val checkpointState =
+ new KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), currentClockMock)
assert(checkpointState.checkpointClock.currentTime() == checkpointIntervalMillis)
- }
+
+ verify(currentClockMock, times(1)).currentTime()
}
test("should checkpoint if we have exceeded the checkpoint interval") {
- expecting {
- currentClockMock.currentTime().andReturn(0).once()
- }
- whenExecuting(currentClockMock) {
- val checkpointState = new KinesisCheckpointState(Milliseconds(Long.MinValue), currentClockMock)
- assert(checkpointState.shouldCheckpoint())
- }
+ when(currentClockMock.currentTime()).thenReturn(0)
+
+ val checkpointState = new KinesisCheckpointState(Milliseconds(Long.MinValue), currentClockMock)
+ assert(checkpointState.shouldCheckpoint())
+
+ verify(currentClockMock, times(1)).currentTime()
}
test("shouldn't checkpoint if we have not exceeded the checkpoint interval") {
- expecting {
- currentClockMock.currentTime().andReturn(0).once()
- }
- whenExecuting(currentClockMock) {
- val checkpointState = new KinesisCheckpointState(Milliseconds(Long.MaxValue), currentClockMock)
- assert(!checkpointState.shouldCheckpoint())
- }
+ when(currentClockMock.currentTime()).thenReturn(0)
+
+ val checkpointState = new KinesisCheckpointState(Milliseconds(Long.MaxValue), currentClockMock)
+ assert(!checkpointState.shouldCheckpoint())
+
+ verify(currentClockMock, times(1)).currentTime()
}
test("should add to time when advancing checkpoint") {
- expecting {
- currentClockMock.currentTime().andReturn(0).once()
- }
- whenExecuting(currentClockMock) {
- val checkpointIntervalMillis = 10
- val checkpointState = new KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), currentClockMock)
- assert(checkpointState.checkpointClock.currentTime() == checkpointIntervalMillis)
- checkpointState.advanceCheckpoint()
- assert(checkpointState.checkpointClock.currentTime() == (2 * checkpointIntervalMillis))
- }
+ when(currentClockMock.currentTime()).thenReturn(0)
+
+ val checkpointIntervalMillis = 10
+ val checkpointState =
+ new KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), currentClockMock)
+ assert(checkpointState.checkpointClock.currentTime() == checkpointIntervalMillis)
+ checkpointState.advanceCheckpoint()
+ assert(checkpointState.checkpointClock.currentTime() == (2 * checkpointIntervalMillis))
+
+ verify(currentClockMock, times(1)).currentTime()
}
test("shutdown should checkpoint if the reason is TERMINATE") {
- expecting {
- checkpointerMock.checkpoint().once()
- }
- whenExecuting(checkpointerMock, checkpointStateMock) {
- val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId,
- checkpointStateMock)
- val reason = ShutdownReason.TERMINATE
- recordProcessor.shutdown(checkpointerMock, reason)
- }
+ val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock)
+ val reason = ShutdownReason.TERMINATE
+ recordProcessor.shutdown(checkpointerMock, reason)
+
+ verify(checkpointerMock, times(1)).checkpoint()
}
test("shutdown should not checkpoint if the reason is something other than TERMINATE") {
- expecting {
- }
- whenExecuting(checkpointerMock, checkpointStateMock) {
- val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId,
- checkpointStateMock)
- recordProcessor.shutdown(checkpointerMock, ShutdownReason.ZOMBIE)
- recordProcessor.shutdown(checkpointerMock, null)
- }
+ val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock)
+ recordProcessor.shutdown(checkpointerMock, ShutdownReason.ZOMBIE)
+ recordProcessor.shutdown(checkpointerMock, null)
+
+ verify(checkpointerMock, never()).checkpoint()
}
test("retry success on first attempt") {
val expectedIsStopped = false
- expecting {
- receiverMock.isStopped().andReturn(expectedIsStopped).once()
- }
- whenExecuting(receiverMock) {
- val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
- assert(actualVal == expectedIsStopped)
- }
+ when(receiverMock.isStopped()).thenReturn(expectedIsStopped)
+
+ val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
+ assert(actualVal == expectedIsStopped)
+
+ verify(receiverMock, times(1)).isStopped()
}
test("retry success on second attempt after a Kinesis throttling exception") {
val expectedIsStopped = false
- expecting {
- receiverMock.isStopped().andThrow(new ThrottlingException("error message"))
- .andReturn(expectedIsStopped).once()
- }
- whenExecuting(receiverMock) {
- val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
- assert(actualVal == expectedIsStopped)
- }
+ when(receiverMock.isStopped())
+ .thenThrow(new ThrottlingException("error message"))
+ .thenReturn(expectedIsStopped)
+
+ val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
+ assert(actualVal == expectedIsStopped)
+
+ verify(receiverMock, times(2)).isStopped()
}
test("retry success on second attempt after a Kinesis dependency exception") {
val expectedIsStopped = false
- expecting {
- receiverMock.isStopped().andThrow(new KinesisClientLibDependencyException("error message"))
- .andReturn(expectedIsStopped).once()
- }
- whenExecuting(receiverMock) {
- val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
- assert(actualVal == expectedIsStopped)
- }
+ when(receiverMock.isStopped())
+ .thenThrow(new KinesisClientLibDependencyException("error message"))
+ .thenReturn(expectedIsStopped)
+
+ val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
+ assert(actualVal == expectedIsStopped)
+
+ verify(receiverMock, times(2)).isStopped()
}
test("retry failed after a shutdown exception") {
- expecting {
- checkpointerMock.checkpoint().andThrow(new ShutdownException("error message")).once()
- }
- whenExecuting(checkpointerMock) {
- intercept[ShutdownException] {
- KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
- }
+ when(checkpointerMock.checkpoint()).thenThrow(new ShutdownException("error message"))
+
+ intercept[ShutdownException] {
+ KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
}
+
+ verify(checkpointerMock, times(1)).checkpoint()
}
test("retry failed after an invalid state exception") {
- expecting {
- checkpointerMock.checkpoint().andThrow(new InvalidStateException("error message")).once()
- }
- whenExecuting(checkpointerMock) {
- intercept[InvalidStateException] {
- KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
- }
+ when(checkpointerMock.checkpoint()).thenThrow(new InvalidStateException("error message"))
+
+ intercept[InvalidStateException] {
+ KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
}
+
+ verify(checkpointerMock, times(1)).checkpoint()
}
test("retry failed after unexpected exception") {
- expecting {
- checkpointerMock.checkpoint().andThrow(new RuntimeException("error message")).once()
- }
- whenExecuting(checkpointerMock) {
- intercept[RuntimeException] {
- KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
- }
+ when(checkpointerMock.checkpoint()).thenThrow(new RuntimeException("error message"))
+
+ intercept[RuntimeException] {
+ KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
}
+
+ verify(checkpointerMock, times(1)).checkpoint()
}
test("retry failed after exhausing all retries") {
val expectedErrorMessage = "final try error message"
- expecting {
- checkpointerMock.checkpoint().andThrow(new ThrottlingException("error message"))
- .andThrow(new ThrottlingException(expectedErrorMessage)).once()
- }
- whenExecuting(checkpointerMock) {
- val exception = intercept[RuntimeException] {
- KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
- }
- exception.getMessage().shouldBe(expectedErrorMessage)
+ when(checkpointerMock.checkpoint())
+ .thenThrow(new ThrottlingException("error message"))
+ .thenThrow(new ThrottlingException(expectedErrorMessage))
+
+ val exception = intercept[RuntimeException] {
+ KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
}
+ exception.getMessage().shouldBe(expectedErrorMessage)
+
+ verify(checkpointerMock, times(2)).checkpoint()
}
}
diff --git a/pom.xml b/pom.xml
index 56e37d4226..53372d5cfc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -620,19 +620,6 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.easymock</groupId>
- <artifactId>easymockclassextension</artifactId>
- <version>3.1</version>
- <scope>test</scope>
- </dependency>
- <!-- Needed by cglib which is needed by easymock. -->
- <dependency>
- <groupId>asm</groupId>
- <artifactId>asm</artifactId>
- <version>3.3.1</version>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.0</version>