From ec68d8f8cfdede8a0de1d56476205158544cc4eb Mon Sep 17 00:00:00 2001 From: Yash Sharma Date: Thu, 13 Apr 2017 08:49:19 +0100 Subject: [SPARK-20189][DSTREAM] Fix spark kinesis testcases to remove deprecated createStream and use Builders ## What changes were proposed in this pull request? The spark-kinesis testcases use the KinesisUtils.createStream which are deprecated now. Modify the testcases to use the recommended KinesisInputDStream.builder instead. This change will also enable the testcases to automatically use the session tokens automatically. ## How was this patch tested? All the existing testcases work fine as expected with the changes. https://issues.apache.org/jira/browse/SPARK-20189 Author: Yash Sharma Closes #17506 from yssharma/ysharma/cleanup_kinesis_testcases. --- .../streaming/kinesis/KinesisInputDStream.scala | 2 +- .../streaming/kinesis/KinesisStreamSuite.scala | 58 ++++++++++++++-------- 2 files changed, 38 insertions(+), 22 deletions(-) (limited to 'external') diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala index 8970ad2baf..77553412ed 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala @@ -267,7 +267,7 @@ object KinesisInputDStream { getRequiredParam(checkpointAppName, "checkpointAppName"), checkpointInterval.getOrElse(ssc.graph.batchDuration), storageLevel.getOrElse(DEFAULT_STORAGE_LEVEL), - handler, + ssc.sc.clean(handler), kinesisCredsProvider.getOrElse(DefaultCredentials), dynamoDBCredsProvider, cloudWatchCredsProvider) diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index ed7e358050..341a6898cb 100644 --- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -22,7 +22,6 @@ import scala.concurrent.duration._ import scala.language.postfixOps import scala.util.Random -import com.amazonaws.regions.RegionUtils import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream import com.amazonaws.services.kinesis.model.Record import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} @@ -173,11 +172,15 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun * and you have to set the system environment variable RUN_KINESIS_TESTS=1 . */ testIfEnabled("basic operation") { - val awsCredentials = KinesisTestUtils.getAWSCredentials() - val stream = KinesisUtils.createStream(ssc, appName, testUtils.streamName, - testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST, - Seconds(10), StorageLevel.MEMORY_ONLY, - awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey) + val stream = KinesisInputDStream.builder.streamingContext(ssc) + .checkpointAppName(appName) + .streamName(testUtils.streamName) + .endpointUrl(testUtils.endpointUrl) + .regionName(testUtils.regionName) + .initialPositionInStream(InitialPositionInStream.LATEST) + .checkpointInterval(Seconds(10)) + .storageLevel(StorageLevel.MEMORY_ONLY) + .build() val collected = new mutable.HashSet[Int] stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd => @@ -198,12 +201,17 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun } testIfEnabled("custom message handling") { - val awsCredentials = KinesisTestUtils.getAWSCredentials() def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5 - val stream = KinesisUtils.createStream(ssc, appName, testUtils.streamName, - testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST, - Seconds(10), StorageLevel.MEMORY_ONLY, addFive(_), - awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey) + + val stream = KinesisInputDStream.builder.streamingContext(ssc) + .checkpointAppName(appName) + .streamName(testUtils.streamName) + .endpointUrl(testUtils.endpointUrl) + .regionName(testUtils.regionName) + .initialPositionInStream(InitialPositionInStream.LATEST) + .checkpointInterval(Seconds(10)) + .storageLevel(StorageLevel.MEMORY_ONLY) + .buildWithMessageHandler(addFive(_)) stream shouldBe a [ReceiverInputDStream[_]] @@ -233,11 +241,15 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun val localTestUtils = new KPLBasedKinesisTestUtils(1) localTestUtils.createStream() try { - val awsCredentials = KinesisTestUtils.getAWSCredentials() - val stream = KinesisUtils.createStream(ssc, localAppName, localTestUtils.streamName, - localTestUtils.endpointUrl, localTestUtils.regionName, InitialPositionInStream.LATEST, - Seconds(10), StorageLevel.MEMORY_ONLY, - awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey) + val stream = KinesisInputDStream.builder.streamingContext(ssc) + .checkpointAppName(localAppName) + .streamName(localTestUtils.streamName) + .endpointUrl(localTestUtils.endpointUrl) + .regionName(localTestUtils.regionName) + .initialPositionInStream(InitialPositionInStream.LATEST) + .checkpointInterval(Seconds(10)) + .storageLevel(StorageLevel.MEMORY_ONLY) + .build() val collected = new mutable.HashSet[Int] stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd => @@ -303,13 +315,17 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun ssc = new StreamingContext(sc, Milliseconds(1000)) ssc.checkpoint(checkpointDir) - val awsCredentials = KinesisTestUtils.getAWSCredentials() val collectedData = new mutable.HashMap[Time, (Array[SequenceNumberRanges], Seq[Int])] - val kinesisStream = KinesisUtils.createStream(ssc, appName, testUtils.streamName, - testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST, - Seconds(10), StorageLevel.MEMORY_ONLY, - awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey) + val kinesisStream = KinesisInputDStream.builder.streamingContext(ssc) + .checkpointAppName(appName) + .streamName(testUtils.streamName) + .endpointUrl(testUtils.endpointUrl) + .regionName(testUtils.regionName) + .initialPositionInStream(InitialPositionInStream.LATEST) + .checkpointInterval(Seconds(10)) + .storageLevel(StorageLevel.MEMORY_ONLY) + .build() // Verify that the generated RDDs are KinesisBackedBlockRDDs, and collect the data in each batch kinesisStream.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => { -- cgit v1.2.3