aboutsummaryrefslogtreecommitdiff
path: root/external/kinesis-asl
diff options
context:
space:
mode:
authorproflin <proflin.me@gmail.com>2016-03-21 08:02:06 +0000
committerSean Owen <sowen@cloudera.com>2016-03-21 08:02:06 +0000
commitc35c60fa916e92916442a98f4af123704bb9692e (patch)
tree05ee40c2a6f8832a8e5a7364c0c9e3d1d329bf39 /external/kinesis-asl
parent761c2d1b6ee0482e2ba15664c4938eb121dda070 (diff)
downloadspark-c35c60fa916e92916442a98f4af123704bb9692e.tar.gz
spark-c35c60fa916e92916442a98f4af123704bb9692e.tar.bz2
spark-c35c60fa916e92916442a98f4af123704bb9692e.zip
[SPARK-14028][STREAMING][KINESIS][TESTS] Remove deprecated methods; fix two other warnings
## What changes were proposed in this pull request? - Removed two methods that has been deprecated since 1.4 - Fixed two other compilation warnings ## How was this patch tested? existing test suits Author: proflin <proflin.me@gmail.com> Closes #11850 from lw-lin/streaming-kinesis-deprecates-warnings.
Diffstat (limited to 'external/kinesis-asl')
-rw-r--r--external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala86
-rw-r--r--external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java11
-rw-r--r--external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala2
-rw-r--r--external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala12
4 files changed, 13 insertions, 98 deletions
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
index 15ac588b82..a0007d33d6 100644
--- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
@@ -225,51 +225,6 @@ object KinesisUtils {
* Create an input stream that pulls messages from a Kinesis stream.
* This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
*
- * Note:
- *
- * - The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
- * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
- * gets AWS credentials.
- * - The region of the `endpointUrl` will be used for DynamoDB and CloudWatch.
- * - The Kinesis application name used by the Kinesis Client Library (KCL) will be the app name
- * in [[org.apache.spark.SparkConf]].
- *
- * @param ssc StreamingContext object
- * @param streamName Kinesis stream name
- * @param endpointUrl Endpoint url of Kinesis service
- * (e.g., https://kinesis.us-east-1.amazonaws.com)
- * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
- * See the Kinesis Spark Streaming documentation for more
- * details on the different types of checkpoints.
- * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
- * worker's initial starting position in the stream.
- * The values are either the beginning of the stream
- * per Kinesis' limit of 24 hours
- * (InitialPositionInStream.TRIM_HORIZON) or
- * the tip of the stream (InitialPositionInStream.LATEST).
- * @param storageLevel Storage level to use for storing the received objects
- * StorageLevel.MEMORY_AND_DISK_2 is recommended.
- */
- @deprecated("use other forms of createStream", "1.4.0")
- def createStream(
- ssc: StreamingContext,
- streamName: String,
- endpointUrl: String,
- checkpointInterval: Duration,
- initialPositionInStream: InitialPositionInStream,
- storageLevel: StorageLevel
- ): ReceiverInputDStream[Array[Byte]] = {
- ssc.withNamedScope("kinesis stream") {
- new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl,
- getRegionByEndpoint(endpointUrl), initialPositionInStream, ssc.sc.appName,
- checkpointInterval, storageLevel, defaultMessageHandler, None)
- }
- }
-
- /**
- * Create an input stream that pulls messages from a Kinesis stream.
- * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
- *
* Note: The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
* on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
* gets the AWS credentials.
@@ -453,47 +408,6 @@ object KinesisUtils {
defaultMessageHandler(_), awsAccessKeyId, awsSecretKey)
}
- /**
- * Create an input stream that pulls messages from a Kinesis stream.
- * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
- *
- * Note:
- * - The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
- * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
- * gets AWS credentials.
- * - The region of the `endpointUrl` will be used for DynamoDB and CloudWatch.
- * - The Kinesis application name used by the Kinesis Client Library (KCL) will be the app name in
- * [[org.apache.spark.SparkConf]].
- *
- * @param jssc Java StreamingContext object
- * @param streamName Kinesis stream name
- * @param endpointUrl Endpoint url of Kinesis service
- * (e.g., https://kinesis.us-east-1.amazonaws.com)
- * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
- * See the Kinesis Spark Streaming documentation for more
- * details on the different types of checkpoints.
- * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
- * worker's initial starting position in the stream.
- * The values are either the beginning of the stream
- * per Kinesis' limit of 24 hours
- * (InitialPositionInStream.TRIM_HORIZON) or
- * the tip of the stream (InitialPositionInStream.LATEST).
- * @param storageLevel Storage level to use for storing the received objects
- * StorageLevel.MEMORY_AND_DISK_2 is recommended.
- */
- @deprecated("use other forms of createStream", "1.4.0")
- def createStream(
- jssc: JavaStreamingContext,
- streamName: String,
- endpointUrl: String,
- checkpointInterval: Duration,
- initialPositionInStream: InitialPositionInStream,
- storageLevel: StorageLevel
- ): JavaReceiverInputDStream[Array[Byte]] = {
- createStream(
- jssc.ssc, streamName, endpointUrl, checkpointInterval, initialPositionInStream, storageLevel)
- }
-
private def getRegionByEndpoint(endpointUrl: String): String = {
RegionUtils.getRegionByEndpoint(endpointUrl).getName()
}
diff --git a/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java b/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
index 5c2371c543..f078973c6c 100644
--- a/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
+++ b/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
@@ -17,6 +17,7 @@
package org.apache.spark.streaming.kinesis;
+import com.amazonaws.regions.RegionUtils;
import com.amazonaws.services.kinesis.model.Record;
import org.junit.Test;
@@ -34,11 +35,13 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionIn
public class JavaKinesisStreamSuite extends LocalJavaStreamingContext {
@Test
public void testKinesisStream() {
- // Tests the API, does not actually test data receiving
- JavaDStream<byte[]> kinesisStream = KinesisUtils.createStream(ssc, "mySparkStream",
- "https://kinesis.us-west-2.amazonaws.com", new Duration(2000),
- InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2());
+ String dummyEndpointUrl = KinesisTestUtils.defaultEndpointUrl();
+ String dummyRegionName = RegionUtils.getRegionByEndpoint(dummyEndpointUrl).getName();
+ // Tests the API, does not actually test data receiving
+ JavaDStream<byte[]> kinesisStream = KinesisUtils.createStream(ssc, "myAppName", "mySparkStream",
+ dummyEndpointUrl, dummyRegionName, InitialPositionInStream.LATEST, new Duration(2000),
+ StorageLevel.MEMORY_AND_DISK_2());
ssc.stop();
}
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
index ee428f31d6..1c81298a7c 100644
--- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
@@ -40,7 +40,7 @@ trait KinesisFunSuite extends SparkFunSuite {
if (shouldRunTests) {
body
} else {
- ignore(s"$message [enable by setting env var $envVarNameForEnablingTests=1]")()
+ ignore(s"$message [enable by setting env var $envVarNameForEnablingTests=1]")(())
}
}
}
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index 4460b6bcca..0e71bf9b84 100644
--- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -99,14 +99,10 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
}
test("KinesisUtils API") {
- // Tests the API, does not actually test data receiving
- val kinesisStream1 = KinesisUtils.createStream(ssc, "mySparkStream",
- dummyEndpointUrl, Seconds(2),
- InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
- val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
+ val kinesisStream1 = KinesisUtils.createStream(ssc, "myAppName", "mySparkStream",
dummyEndpointUrl, dummyRegionName,
InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2)
- val kinesisStream3 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
+ val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppName", "mySparkStream",
dummyEndpointUrl, dummyRegionName,
InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2,
dummyAWSAccessKey, dummyAWSSecretKey)
@@ -154,7 +150,9 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
// Verify that KinesisBackedBlockRDD is generated even when there are no blocks
val emptyRDD = kinesisStream.createBlockRDD(time, Seq.empty)
- emptyRDD shouldBe a [KinesisBackedBlockRDD[Array[Byte]]]
+ // Verify it's KinesisBackedBlockRDD[_] rather than KinesisBackedBlockRDD[Array[Byte]], because
+ // the type parameter will be erased at runtime
+ emptyRDD shouldBe a [KinesisBackedBlockRDD[_]]
emptyRDD.partitions shouldBe empty
// Verify that the KinesisBackedBlockRDD has isBlockValid = false when blocks are invalid