aboutsummaryrefslogtreecommitdiff
path: root/extras
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-02-13 09:53:57 -0800
committerAndrew Or <andrew@databricks.com>2015-02-13 09:55:36 -0800
commit077eec2d9dba197f51004ee4a322d0fa71424ea0 (patch)
treeaad94c050180d590c93e85cd6d691f78a27f9dd3 /extras
parentfc6d3e796a3c600e2f7827562455d555e59775ae (diff)
downloadspark-077eec2d9dba197f51004ee4a322d0fa71424ea0.tar.gz
spark-077eec2d9dba197f51004ee4a322d0fa71424ea0.tar.bz2
spark-077eec2d9dba197f51004ee4a322d0fa71424ea0.zip
[SPARK-5735] Replace uses of EasyMock with Mockito
This patch replaces all uses of EasyMock with Mockito. There are two motivations for this: 1. We should use a single mocking framework in our tests in order to keep things consistent. 2. EasyMock may be responsible for non-deterministic unit test failures due to its Objensis dependency (see SPARK-5626 for more details). Most of these changes are fairly mechanical translations of Mockito code to EasyMock, although I made a small change that strengthens the assertions in one test in KinesisReceiverSuite. Author: Josh Rosen <joshrosen@databricks.com> Closes #4578 from JoshRosen/SPARK-5735-remove-easymock and squashes the following commits: 0ab192b [Josh Rosen] Import sorting plus two minor changes to more closely match old semantics. 977565b [Josh Rosen] Remove EasyMock from build. fae1d8f [Josh Rosen] Remove EasyMock usage in KinesisReceiverSuite. 7cca486 [Josh Rosen] Remove EasyMock usage in MesosSchedulerBackendSuite fc5e94d [Josh Rosen] Remove EasyMock in CacheManagerSuite
Diffstat (limited to 'extras')
-rw-r--r--extras/kinesis-asl/pom.xml5
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala263
2 files changed, 127 insertions, 141 deletions
diff --git a/extras/kinesis-asl/pom.xml b/extras/kinesis-asl/pom.xml
index c815eda52b..216661b8bc 100644
--- a/extras/kinesis-asl/pom.xml
+++ b/extras/kinesis-asl/pom.xml
@@ -68,11 +68,6 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.easymock</groupId>
- <artifactId>easymockclassextension</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>com.novocode</groupId>
<artifactId>junit-interface</artifactId>
<scope>test</scope>
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
index 41dbd64c2b..f56898af02 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
@@ -20,7 +20,6 @@ import java.nio.ByteBuffer
import scala.collection.JavaConversions.seqAsJavaList
-import org.apache.spark.annotation.Experimental
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Milliseconds
import org.apache.spark.streaming.Seconds
@@ -28,9 +27,11 @@ import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.TestSuiteBase
import org.apache.spark.streaming.util.Clock
import org.apache.spark.streaming.util.ManualClock
+
+import org.mockito.Mockito._
import org.scalatest.BeforeAndAfter
import org.scalatest.Matchers
-import org.scalatest.mock.EasyMockSugar
+import org.scalatest.mock.MockitoSugar
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException
@@ -42,10 +43,10 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
import com.amazonaws.services.kinesis.model.Record
/**
- * Suite of Kinesis streaming receiver tests focusing mostly on the KinesisRecordProcessor
+ * Suite of Kinesis streaming receiver tests focusing mostly on the KinesisRecordProcessor
*/
class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAfter
- with EasyMockSugar {
+ with MockitoSugar {
val app = "TestKinesisReceiver"
val stream = "mySparkStream"
@@ -73,6 +74,14 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
currentClockMock = mock[Clock]
}
+ override def afterFunction(): Unit = {
+ super.afterFunction()
+ // Since this suite was originally written using EasyMock, add this to preserve the old
+ // mocking semantics (see SPARK-5735 for more details)
+ verifyNoMoreInteractions(receiverMock, checkpointerMock, checkpointClockMock,
+ checkpointStateMock, currentClockMock)
+ }
+
test("kinesis utils api") {
val ssc = new StreamingContext(master, framework, batchDuration)
// Tests the API, does not actually test data receiving
@@ -83,193 +92,175 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
}
test("process records including store and checkpoint") {
- val expectedCheckpointIntervalMillis = 10
- expecting {
- receiverMock.isStopped().andReturn(false).once()
- receiverMock.store(record1.getData().array()).once()
- receiverMock.store(record2.getData().array()).once()
- checkpointStateMock.shouldCheckpoint().andReturn(true).once()
- checkpointerMock.checkpoint().once()
- checkpointStateMock.advanceCheckpoint().once()
- }
- whenExecuting(receiverMock, checkpointerMock, checkpointStateMock) {
- val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId,
- checkpointStateMock)
- recordProcessor.processRecords(batch, checkpointerMock)
- }
+ when(receiverMock.isStopped()).thenReturn(false)
+ when(checkpointStateMock.shouldCheckpoint()).thenReturn(true)
+
+ val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock)
+ recordProcessor.processRecords(batch, checkpointerMock)
+
+ verify(receiverMock, times(1)).isStopped()
+ verify(receiverMock, times(1)).store(record1.getData().array())
+ verify(receiverMock, times(1)).store(record2.getData().array())
+ verify(checkpointStateMock, times(1)).shouldCheckpoint()
+ verify(checkpointerMock, times(1)).checkpoint()
+ verify(checkpointStateMock, times(1)).advanceCheckpoint()
}
test("shouldn't store and checkpoint when receiver is stopped") {
- expecting {
- receiverMock.isStopped().andReturn(true).once()
- }
- whenExecuting(receiverMock, checkpointerMock, checkpointStateMock) {
- val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId,
- checkpointStateMock)
- recordProcessor.processRecords(batch, checkpointerMock)
- }
+ when(receiverMock.isStopped()).thenReturn(true)
+
+ val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock)
+ recordProcessor.processRecords(batch, checkpointerMock)
+
+ verify(receiverMock, times(1)).isStopped()
}
test("shouldn't checkpoint when exception occurs during store") {
- expecting {
- receiverMock.isStopped().andReturn(false).once()
- receiverMock.store(record1.getData().array()).andThrow(new RuntimeException()).once()
- }
- whenExecuting(receiverMock, checkpointerMock, checkpointStateMock) {
- intercept[RuntimeException] {
- val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId,
- checkpointStateMock)
- recordProcessor.processRecords(batch, checkpointerMock)
- }
+ when(receiverMock.isStopped()).thenReturn(false)
+ when(receiverMock.store(record1.getData().array())).thenThrow(new RuntimeException())
+
+ intercept[RuntimeException] {
+ val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock)
+ recordProcessor.processRecords(batch, checkpointerMock)
}
+
+ verify(receiverMock, times(1)).isStopped()
+ verify(receiverMock, times(1)).store(record1.getData().array())
}
test("should set checkpoint time to currentTime + checkpoint interval upon instantiation") {
- expecting {
- currentClockMock.currentTime().andReturn(0).once()
- }
- whenExecuting(currentClockMock) {
+ when(currentClockMock.currentTime()).thenReturn(0)
+
val checkpointIntervalMillis = 10
- val checkpointState = new KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), currentClockMock)
+ val checkpointState =
+ new KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), currentClockMock)
assert(checkpointState.checkpointClock.currentTime() == checkpointIntervalMillis)
- }
+
+ verify(currentClockMock, times(1)).currentTime()
}
test("should checkpoint if we have exceeded the checkpoint interval") {
- expecting {
- currentClockMock.currentTime().andReturn(0).once()
- }
- whenExecuting(currentClockMock) {
- val checkpointState = new KinesisCheckpointState(Milliseconds(Long.MinValue), currentClockMock)
- assert(checkpointState.shouldCheckpoint())
- }
+ when(currentClockMock.currentTime()).thenReturn(0)
+
+ val checkpointState = new KinesisCheckpointState(Milliseconds(Long.MinValue), currentClockMock)
+ assert(checkpointState.shouldCheckpoint())
+
+ verify(currentClockMock, times(1)).currentTime()
}
test("shouldn't checkpoint if we have not exceeded the checkpoint interval") {
- expecting {
- currentClockMock.currentTime().andReturn(0).once()
- }
- whenExecuting(currentClockMock) {
- val checkpointState = new KinesisCheckpointState(Milliseconds(Long.MaxValue), currentClockMock)
- assert(!checkpointState.shouldCheckpoint())
- }
+ when(currentClockMock.currentTime()).thenReturn(0)
+
+ val checkpointState = new KinesisCheckpointState(Milliseconds(Long.MaxValue), currentClockMock)
+ assert(!checkpointState.shouldCheckpoint())
+
+ verify(currentClockMock, times(1)).currentTime()
}
test("should add to time when advancing checkpoint") {
- expecting {
- currentClockMock.currentTime().andReturn(0).once()
- }
- whenExecuting(currentClockMock) {
- val checkpointIntervalMillis = 10
- val checkpointState = new KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), currentClockMock)
- assert(checkpointState.checkpointClock.currentTime() == checkpointIntervalMillis)
- checkpointState.advanceCheckpoint()
- assert(checkpointState.checkpointClock.currentTime() == (2 * checkpointIntervalMillis))
- }
+ when(currentClockMock.currentTime()).thenReturn(0)
+
+ val checkpointIntervalMillis = 10
+ val checkpointState =
+ new KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), currentClockMock)
+ assert(checkpointState.checkpointClock.currentTime() == checkpointIntervalMillis)
+ checkpointState.advanceCheckpoint()
+ assert(checkpointState.checkpointClock.currentTime() == (2 * checkpointIntervalMillis))
+
+ verify(currentClockMock, times(1)).currentTime()
}
test("shutdown should checkpoint if the reason is TERMINATE") {
- expecting {
- checkpointerMock.checkpoint().once()
- }
- whenExecuting(checkpointerMock, checkpointStateMock) {
- val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId,
- checkpointStateMock)
- val reason = ShutdownReason.TERMINATE
- recordProcessor.shutdown(checkpointerMock, reason)
- }
+ val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock)
+ val reason = ShutdownReason.TERMINATE
+ recordProcessor.shutdown(checkpointerMock, reason)
+
+ verify(checkpointerMock, times(1)).checkpoint()
}
test("shutdown should not checkpoint if the reason is something other than TERMINATE") {
- expecting {
- }
- whenExecuting(checkpointerMock, checkpointStateMock) {
- val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId,
- checkpointStateMock)
- recordProcessor.shutdown(checkpointerMock, ShutdownReason.ZOMBIE)
- recordProcessor.shutdown(checkpointerMock, null)
- }
+ val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock)
+ recordProcessor.shutdown(checkpointerMock, ShutdownReason.ZOMBIE)
+ recordProcessor.shutdown(checkpointerMock, null)
+
+ verify(checkpointerMock, never()).checkpoint()
}
test("retry success on first attempt") {
val expectedIsStopped = false
- expecting {
- receiverMock.isStopped().andReturn(expectedIsStopped).once()
- }
- whenExecuting(receiverMock) {
- val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
- assert(actualVal == expectedIsStopped)
- }
+ when(receiverMock.isStopped()).thenReturn(expectedIsStopped)
+
+ val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
+ assert(actualVal == expectedIsStopped)
+
+ verify(receiverMock, times(1)).isStopped()
}
test("retry success on second attempt after a Kinesis throttling exception") {
val expectedIsStopped = false
- expecting {
- receiverMock.isStopped().andThrow(new ThrottlingException("error message"))
- .andReturn(expectedIsStopped).once()
- }
- whenExecuting(receiverMock) {
- val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
- assert(actualVal == expectedIsStopped)
- }
+ when(receiverMock.isStopped())
+ .thenThrow(new ThrottlingException("error message"))
+ .thenReturn(expectedIsStopped)
+
+ val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
+ assert(actualVal == expectedIsStopped)
+
+ verify(receiverMock, times(2)).isStopped()
}
test("retry success on second attempt after a Kinesis dependency exception") {
val expectedIsStopped = false
- expecting {
- receiverMock.isStopped().andThrow(new KinesisClientLibDependencyException("error message"))
- .andReturn(expectedIsStopped).once()
- }
- whenExecuting(receiverMock) {
- val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
- assert(actualVal == expectedIsStopped)
- }
+ when(receiverMock.isStopped())
+ .thenThrow(new KinesisClientLibDependencyException("error message"))
+ .thenReturn(expectedIsStopped)
+
+ val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
+ assert(actualVal == expectedIsStopped)
+
+ verify(receiverMock, times(2)).isStopped()
}
test("retry failed after a shutdown exception") {
- expecting {
- checkpointerMock.checkpoint().andThrow(new ShutdownException("error message")).once()
- }
- whenExecuting(checkpointerMock) {
- intercept[ShutdownException] {
- KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
- }
+ when(checkpointerMock.checkpoint()).thenThrow(new ShutdownException("error message"))
+
+ intercept[ShutdownException] {
+ KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
}
+
+ verify(checkpointerMock, times(1)).checkpoint()
}
test("retry failed after an invalid state exception") {
- expecting {
- checkpointerMock.checkpoint().andThrow(new InvalidStateException("error message")).once()
- }
- whenExecuting(checkpointerMock) {
- intercept[InvalidStateException] {
- KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
- }
+ when(checkpointerMock.checkpoint()).thenThrow(new InvalidStateException("error message"))
+
+ intercept[InvalidStateException] {
+ KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
}
+
+ verify(checkpointerMock, times(1)).checkpoint()
}
test("retry failed after unexpected exception") {
- expecting {
- checkpointerMock.checkpoint().andThrow(new RuntimeException("error message")).once()
- }
- whenExecuting(checkpointerMock) {
- intercept[RuntimeException] {
- KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
- }
+ when(checkpointerMock.checkpoint()).thenThrow(new RuntimeException("error message"))
+
+ intercept[RuntimeException] {
+ KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
}
+
+ verify(checkpointerMock, times(1)).checkpoint()
}
test("retry failed after exhausing all retries") {
val expectedErrorMessage = "final try error message"
- expecting {
- checkpointerMock.checkpoint().andThrow(new ThrottlingException("error message"))
- .andThrow(new ThrottlingException(expectedErrorMessage)).once()
- }
- whenExecuting(checkpointerMock) {
- val exception = intercept[RuntimeException] {
- KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
- }
- exception.getMessage().shouldBe(expectedErrorMessage)
+ when(checkpointerMock.checkpoint())
+ .thenThrow(new ThrottlingException("error message"))
+ .thenThrow(new ThrottlingException(expectedErrorMessage))
+
+ val exception = intercept[RuntimeException] {
+ KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
}
+ exception.getMessage().shouldBe(expectedErrorMessage)
+
+ verify(checkpointerMock, times(2)).checkpoint()
}
}