aboutsummaryrefslogtreecommitdiff
path: root/extras/kinesis-asl/src/test
diff options
context:
space:
mode:
authorChris Fregly <chris@fregly.com>2014-08-02 13:35:35 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-08-02 13:35:35 -0700
commit91f9504e6086fac05b40545099f9818949c24bca (patch)
treec79c63f0b3f82c4c9b632072f384b85bc7f646f1 /extras/kinesis-asl/src/test
parent67bd8e3c217a80c3117a6e3853aa60fe13d08c91 (diff)
downloadspark-91f9504e6086fac05b40545099f9818949c24bca.tar.gz
spark-91f9504e6086fac05b40545099f9818949c24bca.tar.bz2
spark-91f9504e6086fac05b40545099f9818949c24bca.zip
[SPARK-1981] Add AWS Kinesis streaming support
Author: Chris Fregly <chris@fregly.com> Closes #1434 from cfregly/master and squashes the following commits: 4774581 [Chris Fregly] updated docs, renamed retry to retryRandom to be more clear, removed retries around store() method 0393795 [Chris Fregly] moved Kinesis examples out of examples/ and back into extras/kinesis-asl 691a6be [Chris Fregly] fixed tests and formatting, fixed a bug with JavaKinesisWordCount during union of streams 0e1c67b [Chris Fregly] Merge remote-tracking branch 'upstream/master' 74e5c7c [Chris Fregly] updated per TD's feedback. simplified examples, updated docs e33cbeb [Chris Fregly] Merge remote-tracking branch 'upstream/master' bf614e9 [Chris Fregly] per matei's feedback: moved the kinesis examples into the examples/ dir d17ca6d [Chris Fregly] per TD's feedback: updated docs, simplified the KinesisUtils api 912640c [Chris Fregly] changed the foundKinesis class to be a publically-avail class db3eefd [Chris Fregly] Merge remote-tracking branch 'upstream/master' 21de67f [Chris Fregly] Merge remote-tracking branch 'upstream/master' 6c39561 [Chris Fregly] parameterized the versions of the aws java sdk and kinesis client 338997e [Chris Fregly] improve build docs for kinesis 828f8ae [Chris Fregly] more cleanup e7c8978 [Chris Fregly] Merge remote-tracking branch 'upstream/master' cd68c0d [Chris Fregly] fixed typos and backward compatibility d18e680 [Chris Fregly] Merge remote-tracking branch 'upstream/master' b3b0ff1 [Chris Fregly] [SPARK-1981] Add AWS Kinesis streaming support
Diffstat (limited to 'extras/kinesis-asl/src/test')
-rw-r--r--extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java41
-rw-r--r--extras/kinesis-asl/src/test/resources/log4j.properties26
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala275
3 files changed, 342 insertions, 0 deletions
diff --git a/extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java b/extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
new file mode 100644
index 0000000000..87954a31f6
--- /dev/null
+++ b/extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis;
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.junit.Test;
+
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+
+/**
+ * Demonstrate the use of the KinesisUtils Java API
+ */
+public class JavaKinesisStreamSuite extends LocalJavaStreamingContext {
+ @Test
+ public void testKinesisStream() {
+ // Tests the API, does not actually test data receiving
+ JavaDStream<byte[]> kinesisStream = KinesisUtils.createStream(ssc, "mySparkStream",
+ "https://kinesis.us-west-2.amazonaws.com", new Duration(2000),
+ InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2());
+
+ ssc.stop();
+ }
+}
diff --git a/extras/kinesis-asl/src/test/resources/log4j.properties b/extras/kinesis-asl/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..e01e049595
--- /dev/null
+++ b/extras/kinesis-asl/src/test/resources/log4j.properties
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+log4j.rootCategory=INFO, file
+# log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
new file mode 100644
index 0000000000..41dbd64c2b
--- /dev/null
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.nio.ByteBuffer
+
+import scala.collection.JavaConversions.seqAsJavaList
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.Milliseconds
+import org.apache.spark.streaming.Seconds
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.TestSuiteBase
+import org.apache.spark.streaming.util.Clock
+import org.apache.spark.streaming.util.ManualClock
+import org.scalatest.BeforeAndAfter
+import org.scalatest.Matchers
+import org.scalatest.mock.EasyMockSugar
+
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
+import com.amazonaws.services.kinesis.model.Record
+
+/**
+ * Suite of Kinesis streaming receiver tests focusing mostly on the KinesisRecordProcessor
+ */
+class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAfter
+ with EasyMockSugar {
+
+ val app = "TestKinesisReceiver"
+ val stream = "mySparkStream"
+ val endpoint = "endpoint-url"
+ val workerId = "dummyWorkerId"
+ val shardId = "dummyShardId"
+
+ val record1 = new Record()
+ record1.setData(ByteBuffer.wrap("Spark In Action".getBytes()))
+ val record2 = new Record()
+ record2.setData(ByteBuffer.wrap("Learning Spark".getBytes()))
+ val batch = List[Record](record1, record2)
+
+ var receiverMock: KinesisReceiver = _
+ var checkpointerMock: IRecordProcessorCheckpointer = _
+ var checkpointClockMock: ManualClock = _
+ var checkpointStateMock: KinesisCheckpointState = _
+ var currentClockMock: Clock = _
+
+ override def beforeFunction() = {
+ receiverMock = mock[KinesisReceiver]
+ checkpointerMock = mock[IRecordProcessorCheckpointer]
+ checkpointClockMock = mock[ManualClock]
+ checkpointStateMock = mock[KinesisCheckpointState]
+ currentClockMock = mock[Clock]
+ }
+
+ test("kinesis utils api") {
+ val ssc = new StreamingContext(master, framework, batchDuration)
+ // Tests the API, does not actually test data receiving
+ val kinesisStream = KinesisUtils.createStream(ssc, "mySparkStream",
+ "https://kinesis.us-west-2.amazonaws.com", Seconds(2),
+ InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2);
+ ssc.stop()
+ }
+
+ test("process records including store and checkpoint") {
+ val expectedCheckpointIntervalMillis = 10
+ expecting {
+ receiverMock.isStopped().andReturn(false).once()
+ receiverMock.store(record1.getData().array()).once()
+ receiverMock.store(record2.getData().array()).once()
+ checkpointStateMock.shouldCheckpoint().andReturn(true).once()
+ checkpointerMock.checkpoint().once()
+ checkpointStateMock.advanceCheckpoint().once()
+ }
+ whenExecuting(receiverMock, checkpointerMock, checkpointStateMock) {
+ val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId,
+ checkpointStateMock)
+ recordProcessor.processRecords(batch, checkpointerMock)
+ }
+ }
+
+ test("shouldn't store and checkpoint when receiver is stopped") {
+ expecting {
+ receiverMock.isStopped().andReturn(true).once()
+ }
+ whenExecuting(receiverMock, checkpointerMock, checkpointStateMock) {
+ val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId,
+ checkpointStateMock)
+ recordProcessor.processRecords(batch, checkpointerMock)
+ }
+ }
+
+ test("shouldn't checkpoint when exception occurs during store") {
+ expecting {
+ receiverMock.isStopped().andReturn(false).once()
+ receiverMock.store(record1.getData().array()).andThrow(new RuntimeException()).once()
+ }
+ whenExecuting(receiverMock, checkpointerMock, checkpointStateMock) {
+ intercept[RuntimeException] {
+ val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId,
+ checkpointStateMock)
+ recordProcessor.processRecords(batch, checkpointerMock)
+ }
+ }
+ }
+
+ test("should set checkpoint time to currentTime + checkpoint interval upon instantiation") {
+ expecting {
+ currentClockMock.currentTime().andReturn(0).once()
+ }
+ whenExecuting(currentClockMock) {
+ val checkpointIntervalMillis = 10
+ val checkpointState = new KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), currentClockMock)
+ assert(checkpointState.checkpointClock.currentTime() == checkpointIntervalMillis)
+ }
+ }
+
+ test("should checkpoint if we have exceeded the checkpoint interval") {
+ expecting {
+ currentClockMock.currentTime().andReturn(0).once()
+ }
+ whenExecuting(currentClockMock) {
+ val checkpointState = new KinesisCheckpointState(Milliseconds(Long.MinValue), currentClockMock)
+ assert(checkpointState.shouldCheckpoint())
+ }
+ }
+
+ test("shouldn't checkpoint if we have not exceeded the checkpoint interval") {
+ expecting {
+ currentClockMock.currentTime().andReturn(0).once()
+ }
+ whenExecuting(currentClockMock) {
+ val checkpointState = new KinesisCheckpointState(Milliseconds(Long.MaxValue), currentClockMock)
+ assert(!checkpointState.shouldCheckpoint())
+ }
+ }
+
+ test("should add to time when advancing checkpoint") {
+ expecting {
+ currentClockMock.currentTime().andReturn(0).once()
+ }
+ whenExecuting(currentClockMock) {
+ val checkpointIntervalMillis = 10
+ val checkpointState = new KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), currentClockMock)
+ assert(checkpointState.checkpointClock.currentTime() == checkpointIntervalMillis)
+ checkpointState.advanceCheckpoint()
+ assert(checkpointState.checkpointClock.currentTime() == (2 * checkpointIntervalMillis))
+ }
+ }
+
+ test("shutdown should checkpoint if the reason is TERMINATE") {
+ expecting {
+ checkpointerMock.checkpoint().once()
+ }
+ whenExecuting(checkpointerMock, checkpointStateMock) {
+ val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId,
+ checkpointStateMock)
+ val reason = ShutdownReason.TERMINATE
+ recordProcessor.shutdown(checkpointerMock, reason)
+ }
+ }
+
+ test("shutdown should not checkpoint if the reason is something other than TERMINATE") {
+ expecting {
+ }
+ whenExecuting(checkpointerMock, checkpointStateMock) {
+ val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId,
+ checkpointStateMock)
+ recordProcessor.shutdown(checkpointerMock, ShutdownReason.ZOMBIE)
+ recordProcessor.shutdown(checkpointerMock, null)
+ }
+ }
+
+ test("retry success on first attempt") {
+ val expectedIsStopped = false
+ expecting {
+ receiverMock.isStopped().andReturn(expectedIsStopped).once()
+ }
+ whenExecuting(receiverMock) {
+ val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
+ assert(actualVal == expectedIsStopped)
+ }
+ }
+
+ test("retry success on second attempt after a Kinesis throttling exception") {
+ val expectedIsStopped = false
+ expecting {
+ receiverMock.isStopped().andThrow(new ThrottlingException("error message"))
+ .andReturn(expectedIsStopped).once()
+ }
+ whenExecuting(receiverMock) {
+ val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
+ assert(actualVal == expectedIsStopped)
+ }
+ }
+
+ test("retry success on second attempt after a Kinesis dependency exception") {
+ val expectedIsStopped = false
+ expecting {
+ receiverMock.isStopped().andThrow(new KinesisClientLibDependencyException("error message"))
+ .andReturn(expectedIsStopped).once()
+ }
+ whenExecuting(receiverMock) {
+ val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
+ assert(actualVal == expectedIsStopped)
+ }
+ }
+
+ test("retry failed after a shutdown exception") {
+ expecting {
+ checkpointerMock.checkpoint().andThrow(new ShutdownException("error message")).once()
+ }
+ whenExecuting(checkpointerMock) {
+ intercept[ShutdownException] {
+ KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
+ }
+ }
+ }
+
+ test("retry failed after an invalid state exception") {
+ expecting {
+ checkpointerMock.checkpoint().andThrow(new InvalidStateException("error message")).once()
+ }
+ whenExecuting(checkpointerMock) {
+ intercept[InvalidStateException] {
+ KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
+ }
+ }
+ }
+
+ test("retry failed after unexpected exception") {
+ expecting {
+ checkpointerMock.checkpoint().andThrow(new RuntimeException("error message")).once()
+ }
+ whenExecuting(checkpointerMock) {
+ intercept[RuntimeException] {
+ KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
+ }
+ }
+ }
+
+ test("retry failed after exhausing all retries") {
+ val expectedErrorMessage = "final try error message"
+ expecting {
+ checkpointerMock.checkpoint().andThrow(new ThrottlingException("error message"))
+ .andThrow(new ThrottlingException(expectedErrorMessage)).once()
+ }
+ whenExecuting(checkpointerMock) {
+ val exception = intercept[RuntimeException] {
+ KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
+ }
+ exception.getMessage().shouldBe(expectedErrorMessage)
+ }
+ }
+}