aboutsummaryrefslogtreecommitdiff
path: root/extras/kinesis-asl/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-05-17 16:49:07 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-05-17 16:49:07 -0700
commitca4257aec658aaa87f4f097dd7534033d5f13ddc (patch)
tree3ec231336161ad38e61c9f6d200aa3bc567a1225 /extras/kinesis-asl/src/test
parent2ca60ace8f42cf0bd4569d86c86c37a8a2b6a37c (diff)
downloadspark-ca4257aec658aaa87f4f097dd7534033d5f13ddc.tar.gz
spark-ca4257aec658aaa87f4f097dd7534033d5f13ddc.tar.bz2
spark-ca4257aec658aaa87f4f097dd7534033d5f13ddc.zip
[SPARK-6514] [SPARK-5960] [SPARK-6656] [SPARK-7679] [STREAMING] [KINESIS] Updates to the Kinesis API
SPARK-6514 - Use correct region SPARK-5960 - Allow AWS Credentials to be directly passed SPARK-6656 - Specify kinesis application name explicitly SPARK-7679 - Upgrade to latest KCL and AWS SDK. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #6147 from tdas/kinesis-api-update and squashes the following commits: f23ea77 [Tathagata Das] Updated versions and updated APIs 373b201 [Tathagata Das] Updated Kinesis API
Diffstat (limited to 'extras/kinesis-asl/src/test')
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala15
1 files changed, 12 insertions, 3 deletions
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
index 255fe65819..7c17ee9dce 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
@@ -40,6 +40,7 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorC
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
import com.amazonaws.services.kinesis.model.Record
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
/**
* Suite of Kinesis streaming receiver tests focusing mostly on the KinesisRecordProcessor
@@ -81,12 +82,20 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
checkpointStateMock, currentClockMock)
}
- test("kinesis utils api") {
+ test("KinesisUtils API") {
val ssc = new StreamingContext(master, framework, batchDuration)
// Tests the API, does not actually test data receiving
- val kinesisStream = KinesisUtils.createStream(ssc, "mySparkStream",
+ val kinesisStream1 = KinesisUtils.createStream(ssc, "mySparkStream",
"https://kinesis.us-west-2.amazonaws.com", Seconds(2),
- InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2);
+ InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
+ val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
+ "https://kinesis.us-west-2.amazonaws.com", "us-west-2",
+ InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2)
+ val kinesisStream3 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
+ "https://kinesis.us-west-2.amazonaws.com", "us-west-2",
+ InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2,
+ "awsAccessKey", "awsSecretKey")
+
ssc.stop()
}