aboutsummaryrefslogtreecommitdiff
path: root/extras/kinesis-asl
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-05-17 16:49:07 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-05-17 16:49:07 -0700
commitca4257aec658aaa87f4f097dd7534033d5f13ddc (patch)
tree3ec231336161ad38e61c9f6d200aa3bc567a1225 /extras/kinesis-asl
parent2ca60ace8f42cf0bd4569d86c86c37a8a2b6a37c (diff)
downloadspark-ca4257aec658aaa87f4f097dd7534033d5f13ddc.tar.gz
spark-ca4257aec658aaa87f4f097dd7534033d5f13ddc.tar.bz2
spark-ca4257aec658aaa87f4f097dd7534033d5f13ddc.zip
[SPARK-6514] [SPARK-5960] [SPARK-6656] [SPARK-7679] [STREAMING] [KINESIS] Updates to the Kinesis API
SPARK-6514 - Use correct region SPARK-5960 - Allow AWS Credentials to be directly passed SPARK-6656 - Specify kinesis application name explicitly SPARK-7679 - Upgrade to latest KCL and AWS SDK. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #6147 from tdas/kinesis-api-update and squashes the following commits: f23ea77 [Tathagata Das] Updated versions and updated APIs 373b201 [Tathagata Das] Updated Kinesis API
Diffstat (limited to 'extras/kinesis-asl')
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala2
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala152
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala32
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala263
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala15
5 files changed, 346 insertions, 118 deletions
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
index 588e86a188..1c9b0c218a 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
@@ -48,7 +48,7 @@ private[kinesis] class KinesisCheckpointState(
/**
* Advance the checkpoint clock by the checkpoint interval.
*/
- def advanceCheckpoint() = {
+ def advanceCheckpoint(): Unit = {
checkpointClock.advance(checkpointInterval.milliseconds)
}
}
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
index a7fe4476ca..01608fbd3f 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -16,32 +16,31 @@
*/
package org.apache.spark.streaming.kinesis
-import java.net.InetAddress
import java.util.UUID
+import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, BasicAWSCredentials, DefaultAWSCredentialsProviderChain}
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorFactory}
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration, Worker}
+
import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.util.Utils
-import com.amazonaws.auth.AWSCredentialsProvider
-import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
-import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
-import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
+
+private[kinesis]
+case class SerializableAWSCredentials(accessKeyId: String, secretKey: String)
+ extends BasicAWSCredentials(accessKeyId, secretKey) with Serializable
/**
* Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver.
* This implementation relies on the Kinesis Client Library (KCL) Worker as described here:
* https://github.com/awslabs/amazon-kinesis-client
- * This is a custom receiver used with StreamingContext.receiverStream(Receiver)
- * as described here:
- * http://spark.apache.org/docs/latest/streaming-custom-receivers.html
- * Instances of this class will get shipped to the Spark Streaming Workers
- * to run within a Spark Executor.
+ * This is a custom receiver used with StreamingContext.receiverStream(Receiver) as described here:
+ * http://spark.apache.org/docs/latest/streaming-custom-receivers.html
+ * Instances of this class will get shipped to the Spark Streaming Workers to run within a
+ * Spark Executor.
*
* @param appName Kinesis application name. Kinesis Apps are mapped to Kinesis Streams
* by the Kinesis Client Library. If you change the App name or Stream name,
@@ -49,6 +48,8 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
* DynamoDB table with the same name this Kinesis application.
* @param streamName Kinesis stream name
* @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
+ * @param regionName Region name used by the Kinesis Client Library for
+ * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
* See the Kinesis Spark Streaming documentation for more
* details on the different types of checkpoints.
@@ -59,92 +60,103 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
* (InitialPositionInStream.TRIM_HORIZON) or
* the tip of the stream (InitialPositionInStream.LATEST).
* @param storageLevel Storage level to use for storing the received objects
- *
- * @return ReceiverInputDStream[Array[Byte]]
+ * @param awsCredentialsOption Optional AWS credentials, used when user directly specifies
+ * the credentials
*/
private[kinesis] class KinesisReceiver(
appName: String,
streamName: String,
endpointUrl: String,
- checkpointInterval: Duration,
+ regionName: String,
initialPositionInStream: InitialPositionInStream,
- storageLevel: StorageLevel)
- extends Receiver[Array[Byte]](storageLevel) with Logging { receiver =>
-
- /*
- * The following vars are built in the onStart() method which executes in the Spark Worker after
- * this code is serialized and shipped remotely.
- */
-
- /*
- * workerId should be based on the ip address of the actual Spark Worker where this code runs
- * (not the Driver's ip address.)
- */
- var workerId: String = null
+ checkpointInterval: Duration,
+ storageLevel: StorageLevel,
+ awsCredentialsOption: Option[SerializableAWSCredentials]
+ ) extends Receiver[Array[Byte]](storageLevel) with Logging { receiver =>
/*
- * This impl uses the DefaultAWSCredentialsProviderChain and searches for credentials
- * in the following order of precedence:
- * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
- * Java System Properties - aws.accessKeyId and aws.secretKey
- * Credential profiles file at the default location (~/.aws/credentials) shared by all
- * AWS SDKs and the AWS CLI
- * Instance profile credentials delivered through the Amazon EC2 metadata service
+ * =================================================================================
+ * The following vars are initialize in the onStart() method which executes in the
+ * Spark worker after this Receiver is serialized and shipped to the worker.
+ * =================================================================================
*/
- var credentialsProvider: AWSCredentialsProvider = null
-
- /* KCL config instance. */
- var kinesisClientLibConfiguration: KinesisClientLibConfiguration = null
- /*
- * RecordProcessorFactory creates impls of IRecordProcessor.
- * IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the
- * IRecordProcessor.processRecords() method.
- * We're using our custom KinesisRecordProcessor in this case.
+ /**
+ * workerId is used by the KCL should be based on the ip address of the actual Spark Worker where this code runs
+ * (not the driver's IP address.)
*/
- var recordProcessorFactory: IRecordProcessorFactory = null
+ private var workerId: String = null
- /*
- * Create a Kinesis Worker.
- * This is the core client abstraction from the Kinesis Client Library (KCL).
- * We pass the RecordProcessorFactory from above as well as the KCL config instance.
- * A Kinesis Worker can process 1..* shards from the given stream - each with its
- * own RecordProcessor.
+ /**
+ * Worker is the core client abstraction from the Kinesis Client Library (KCL).
+ * A worker can process more than one shards from the given stream.
+ * Each shard is assigned its own IRecordProcessor and the worker run multiple such
+ * processors.
*/
- var worker: Worker = null
+ private var worker: Worker = null
/**
- * This is called when the KinesisReceiver starts and must be non-blocking.
- * The KCL creates and manages the receiving/processing thread pool through the Worker.run()
- * method.
+ * This is called when the KinesisReceiver starts and must be non-blocking.
+ * The KCL creates and manages the receiving/processing thread pool through Worker.run().
*/
override def onStart() {
workerId = Utils.localHostName() + ":" + UUID.randomUUID()
- credentialsProvider = new DefaultAWSCredentialsProviderChain()
- kinesisClientLibConfiguration = new KinesisClientLibConfiguration(appName, streamName,
- credentialsProvider, workerId).withKinesisEndpoint(endpointUrl)
- .withInitialPositionInStream(initialPositionInStream).withTaskBackoffTimeMillis(500)
- recordProcessorFactory = new IRecordProcessorFactory {
+
+ // KCL config instance
+ val awsCredProvider = resolveAWSCredentialsProvider()
+ val kinesisClientLibConfiguration =
+ new KinesisClientLibConfiguration(appName, streamName, awsCredProvider, workerId)
+ .withKinesisEndpoint(endpointUrl)
+ .withInitialPositionInStream(initialPositionInStream)
+ .withTaskBackoffTimeMillis(500)
+ .withRegionName(regionName)
+
+ /*
+ * RecordProcessorFactory creates impls of IRecordProcessor.
+ * IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the
+ * IRecordProcessor.processRecords() method.
+ * We're using our custom KinesisRecordProcessor in this case.
+ */
+ val recordProcessorFactory = new IRecordProcessorFactory {
override def createProcessor: IRecordProcessor = new KinesisRecordProcessor(receiver,
workerId, new KinesisCheckpointState(checkpointInterval))
}
+
worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration)
worker.run()
+
logInfo(s"Started receiver with workerId $workerId")
}
/**
- * This is called when the KinesisReceiver stops.
- * The KCL worker.shutdown() method stops the receiving/processing threads.
- * The KCL will do its best to drain and checkpoint any in-flight records upon shutdown.
+ * This is called when the KinesisReceiver stops.
+ * The KCL worker.shutdown() method stops the receiving/processing threads.
+ * The KCL will do its best to drain and checkpoint any in-flight records upon shutdown.
*/
override def onStop() {
- worker.shutdown()
- logInfo(s"Shut down receiver with workerId $workerId")
+ if (worker != null) {
+ worker.shutdown()
+ logInfo(s"Stopped receiver for workerId $workerId")
+ worker = null
+ }
workerId = null
- credentialsProvider = null
- kinesisClientLibConfiguration = null
- recordProcessorFactory = null
- worker = null
+ }
+
+ /**
+ * If AWS credential is provided, return a AWSCredentialProvider returning that credential.
+ * Otherwise, return the DefaultAWSCredentialsProviderChain.
+ */
+ private def resolveAWSCredentialsProvider(): AWSCredentialsProvider = {
+ awsCredentialsOption match {
+ case Some(awsCredentials) =>
+ logInfo("Using provided AWS credentials")
+ new AWSCredentialsProvider {
+ override def getCredentials: AWSCredentials = awsCredentials
+ override def refresh(): Unit = { }
+ }
+ case None =>
+ logInfo("Using DefaultAWSCredentialsProviderChain")
+ new DefaultAWSCredentialsProviderChain()
+ }
}
}
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
index af8cd875b4..f65e743c4e 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
@@ -35,7 +35,10 @@ import com.amazonaws.services.kinesis.model.Record
/**
* Kinesis-specific implementation of the Kinesis Client Library (KCL) IRecordProcessor.
* This implementation operates on the Array[Byte] from the KinesisReceiver.
- * The Kinesis Worker creates an instance of this KinesisRecordProcessor upon startup.
+ * The Kinesis Worker creates an instance of this KinesisRecordProcessor for each
+ * shard in the Kinesis stream upon startup. This is normally done in separate threads,
+ * but the KCLs within the KinesisReceivers will balance themselves out if you create
+ * multiple Receivers.
*
* @param receiver Kinesis receiver
* @param workerId for logging purposes
@@ -47,8 +50,8 @@ private[kinesis] class KinesisRecordProcessor(
workerId: String,
checkpointState: KinesisCheckpointState) extends IRecordProcessor with Logging {
- /* shardId to be populated during initialize() */
- var shardId: String = _
+ // shardId to be populated during initialize()
+ private var shardId: String = _
/**
* The Kinesis Client Library calls this method during IRecordProcessor initialization.
@@ -56,8 +59,8 @@ private[kinesis] class KinesisRecordProcessor(
* @param shardId assigned by the KCL to this particular RecordProcessor.
*/
override def initialize(shardId: String) {
- logInfo(s"Initialize: Initializing workerId $workerId with shardId $shardId")
this.shardId = shardId
+ logInfo(s"Initialized workerId $workerId with shardId $shardId")
}
/**
@@ -73,12 +76,17 @@ private[kinesis] class KinesisRecordProcessor(
if (!receiver.isStopped()) {
try {
/*
- * Note: If we try to store the raw ByteBuffer from record.getData(), the Spark Streaming
- * Receiver.store(ByteBuffer) attempts to deserialize the ByteBuffer using the
- * internally-configured Spark serializer (kryo, etc).
- * This is not desirable, so we instead store a raw Array[Byte] and decouple
- * ourselves from Spark's internal serialization strategy.
- */
+ * Notes:
+ * 1) If we try to store the raw ByteBuffer from record.getData(), the Spark Streaming
+ * Receiver.store(ByteBuffer) attempts to deserialize the ByteBuffer using the
+ * internally-configured Spark serializer (kryo, etc).
+ * 2) This is not desirable, so we instead store a raw Array[Byte] and decouple
+ * ourselves from Spark's internal serialization strategy.
+ * 3) For performance, the BlockGenerator is asynchronously queuing elements within its
+ * memory before creating blocks. This prevents the small block scenario, but requires
+ * that you register callbacks to know when a block has been generated and stored
+ * (WAL is sufficient for storage) before can checkpoint back to the source.
+ */
batch.foreach(record => receiver.store(record.getData().array()))
logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId")
@@ -116,7 +124,7 @@ private[kinesis] class KinesisRecordProcessor(
logError(s"Exception: WorkerId $workerId encountered and exception while storing " +
" or checkpointing a batch for workerId $workerId and shardId $shardId.", e)
- /* Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor.*/
+ /* Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor. */
throw e
}
}
@@ -190,7 +198,7 @@ private[kinesis] object KinesisRecordProcessor extends Logging {
logError(s"Retryable Exception: Random backOffMillis=${backOffMillis}", e)
retryRandom(expression, numRetriesLeft - 1, maxBackOffMillis)
}
- /* Throw: Shutdown has been requested by the Kinesis Client Library.*/
+ /* Throw: Shutdown has been requested by the Kinesis Client Library. */
case _: ShutdownException => {
logError(s"ShutdownException: Caught shutdown exception, skipping checkpoint.", e)
throw e
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
index 96f4399acc..b114bcff92 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
@@ -16,29 +16,75 @@
*/
package org.apache.spark.streaming.kinesis
-import org.apache.spark.annotation.Experimental
+import com.amazonaws.regions.RegionUtils
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.Duration
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
-import org.apache.spark.streaming.api.java.JavaStreamingContext
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
-
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import org.apache.spark.streaming.{Duration, StreamingContext}
-/**
- * Helper class to create Amazon Kinesis Input Stream
- * :: Experimental ::
- */
-@Experimental
object KinesisUtils {
/**
- * Create an InputDStream that pulls messages from a Kinesis stream.
- * :: Experimental ::
- * @param ssc StreamingContext object
+ * Create an input stream that pulls messages from a Kinesis stream.
+ * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
+ *
+ * Note: The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
+ * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
+ * gets the AWS credentials.
+ *
+ * @param ssc StreamingContext object
+ * @param kinesisAppName Kinesis application name used by the Kinesis Client Library
+ * (KCL) to update DynamoDB
+ * @param streamName Kinesis stream name
+ * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
+ * @param regionName Name of region used by the Kinesis Client Library (KCL) to update
+ * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
+ * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
+ * worker's initial starting position in the stream.
+ * The values are either the beginning of the stream
+ * per Kinesis' limit of 24 hours
+ * (InitialPositionInStream.TRIM_HORIZON) or
+ * the tip of the stream (InitialPositionInStream.LATEST).
+ * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
+ * See the Kinesis Spark Streaming documentation for more
+ * details on the different types of checkpoints.
+ * @param storageLevel Storage level to use for storing the received objects.
+ * StorageLevel.MEMORY_AND_DISK_2 is recommended.
+ */
+ def createStream(
+ ssc: StreamingContext,
+ kinesisAppName: String,
+ streamName: String,
+ endpointUrl: String,
+ regionName: String,
+ initialPositionInStream: InitialPositionInStream,
+ checkpointInterval: Duration,
+ storageLevel: StorageLevel
+ ): ReceiverInputDStream[Array[Byte]] = {
+ ssc.receiverStream(
+ new KinesisReceiver(kinesisAppName, streamName, endpointUrl, validateRegion(regionName),
+ initialPositionInStream, checkpointInterval, storageLevel, None))
+ }
+
+ /**
+ * Create an input stream that pulls messages from a Kinesis stream.
+ * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
+ *
+ * Note:
+ * The given AWS credentials will get saved in DStream checkpoints if checkpointing
+ * is enabled. Make sure that your checkpoint directory is secure.
+ *
+ * @param ssc StreamingContext object
+ * @param kinesisAppName Kinesis application name used by the Kinesis Client Library
+ * (KCL) to update DynamoDB
* @param streamName Kinesis stream name
* @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
+ * @param regionName Name of region used by the Kinesis Client Library (KCL) to update
+ * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
+ * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain)
+ * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain)
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
* See the Kinesis Spark Streaming documentation for more
* details on the different types of checkpoints.
@@ -48,28 +94,84 @@ object KinesisUtils {
* per Kinesis' limit of 24 hours
* (InitialPositionInStream.TRIM_HORIZON) or
* the tip of the stream (InitialPositionInStream.LATEST).
- * @param storageLevel Storage level to use for storing the received objects
+ * @param storageLevel Storage level to use for storing the received objects.
+ * StorageLevel.MEMORY_AND_DISK_2 is recommended.
+ */
+ def createStream(
+ ssc: StreamingContext,
+ kinesisAppName: String,
+ streamName: String,
+ endpointUrl: String,
+ regionName: String,
+ initialPositionInStream: InitialPositionInStream,
+ checkpointInterval: Duration,
+ storageLevel: StorageLevel,
+ awsAccessKeyId: String,
+ awsSecretKey: String
+ ): ReceiverInputDStream[Array[Byte]] = {
+ ssc.receiverStream(
+ new KinesisReceiver(kinesisAppName, streamName, endpointUrl, validateRegion(regionName),
+ initialPositionInStream, checkpointInterval, storageLevel,
+ Some(SerializableAWSCredentials(awsAccessKeyId, awsSecretKey))))
+ }
+
+ /**
+ * Create an input stream that pulls messages from a Kinesis stream.
+ * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
*
- * @return ReceiverInputDStream[Array[Byte]]
+ * Note:
+ * - The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
+ * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
+ * gets AWS credentials.
+ * - The region of the `endpointUrl` will be used for DynamoDB and CloudWatch.
+ * - The Kinesis application name used by the Kinesis Client Library (KCL) will be the app name in
+ * [[org.apache.spark.SparkConf]].
+ *
+ * @param ssc Java StreamingContext object
+ * @param streamName Kinesis stream name
+ * @param endpointUrl Endpoint url of Kinesis service
+ * (e.g., https://kinesis.us-east-1.amazonaws.com)
+ * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
+ * See the Kinesis Spark Streaming documentation for more
+ * details on the different types of checkpoints.
+ * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
+ * worker's initial starting position in the stream.
+ * The values are either the beginning of the stream
+ * per Kinesis' limit of 24 hours
+ * (InitialPositionInStream.TRIM_HORIZON) or
+ * the tip of the stream (InitialPositionInStream.LATEST).
+ * @param storageLevel Storage level to use for storing the received objects
+ * StorageLevel.MEMORY_AND_DISK_2 is recommended.
*/
- @Experimental
+ @deprecated("use other forms of createStream", "1.4.0")
def createStream(
ssc: StreamingContext,
streamName: String,
endpointUrl: String,
checkpointInterval: Duration,
initialPositionInStream: InitialPositionInStream,
- storageLevel: StorageLevel): ReceiverInputDStream[Array[Byte]] = {
- ssc.receiverStream(new KinesisReceiver(ssc.sc.appName, streamName, endpointUrl,
- checkpointInterval, initialPositionInStream, storageLevel))
+ storageLevel: StorageLevel
+ ): ReceiverInputDStream[Array[Byte]] = {
+ ssc.receiverStream(
+ new KinesisReceiver(ssc.sc.appName, streamName, endpointUrl, getRegionByEndpoint(endpointUrl),
+ initialPositionInStream, checkpointInterval, storageLevel, None))
}
/**
- * Create a Java-friendly InputDStream that pulls messages from a Kinesis stream.
- * :: Experimental ::
+ * Create an input stream that pulls messages from a Kinesis stream.
+ * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
+ *
+ * Note: The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
+ * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
+ * gets the AWS credentials.
+ *
* @param jssc Java StreamingContext object
+ * @param kinesisAppName Kinesis application name used by the Kinesis Client Library
+ * (KCL) to update DynamoDB
* @param streamName Kinesis stream name
* @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
+ * @param regionName Name of region used by the Kinesis Client Library (KCL) to update
+ * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
* See the Kinesis Spark Streaming documentation for more
* details on the different types of checkpoints.
@@ -79,19 +181,116 @@ object KinesisUtils {
* per Kinesis' limit of 24 hours
* (InitialPositionInStream.TRIM_HORIZON) or
* the tip of the stream (InitialPositionInStream.LATEST).
- * @param storageLevel Storage level to use for storing the received objects
+ * @param storageLevel Storage level to use for storing the received objects.
+ * StorageLevel.MEMORY_AND_DISK_2 is recommended.
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ kinesisAppName: String,
+ streamName: String,
+ endpointUrl: String,
+ regionName: String,
+ initialPositionInStream: InitialPositionInStream,
+ checkpointInterval: Duration,
+ storageLevel: StorageLevel
+ ): JavaReceiverInputDStream[Array[Byte]] = {
+ createStream(jssc.ssc, kinesisAppName, streamName, endpointUrl, regionName,
+ initialPositionInStream, checkpointInterval, storageLevel)
+ }
+
+ /**
+ * Create an input stream that pulls messages from a Kinesis stream.
+ * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
*
- * @return JavaReceiverInputDStream[Array[Byte]]
+ * Note:
+ * The given AWS credentials will get saved in DStream checkpoints if checkpointing
+ * is enabled. Make sure that your checkpoint directory is secure.
+ *
+ * @param jssc Java StreamingContext object
+ * @param kinesisAppName Kinesis application name used by the Kinesis Client Library
+ * (KCL) to update DynamoDB
+ * @param streamName Kinesis stream name
+ * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
+ * @param regionName Name of region used by the Kinesis Client Library (KCL) to update
+ * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
+ * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain)
+ * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain)
+ * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
+ * See the Kinesis Spark Streaming documentation for more
+ * details on the different types of checkpoints.
+ * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
+ * worker's initial starting position in the stream.
+ * The values are either the beginning of the stream
+ * per Kinesis' limit of 24 hours
+ * (InitialPositionInStream.TRIM_HORIZON) or
+ * the tip of the stream (InitialPositionInStream.LATEST).
+ * @param storageLevel Storage level to use for storing the received objects.
+ * StorageLevel.MEMORY_AND_DISK_2 is recommended.
*/
- @Experimental
def createStream(
- jssc: JavaStreamingContext,
- streamName: String,
- endpointUrl: String,
+ jssc: JavaStreamingContext,
+ kinesisAppName: String,
+ streamName: String,
+ endpointUrl: String,
+ regionName: String,
+ initialPositionInStream: InitialPositionInStream,
+ checkpointInterval: Duration,
+ storageLevel: StorageLevel,
+ awsAccessKeyId: String,
+ awsSecretKey: String
+ ): JavaReceiverInputDStream[Array[Byte]] = {
+ createStream(jssc.ssc, kinesisAppName, streamName, endpointUrl, regionName,
+ initialPositionInStream, checkpointInterval, storageLevel, awsAccessKeyId, awsSecretKey)
+ }
+
+ /**
+ * Create an input stream that pulls messages from a Kinesis stream.
+ * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
+ *
+ * Note:
+ * - The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
+ * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
+ * gets AWS credentials.
+ * - The region of the `endpointUrl` will be used for DynamoDB and CloudWatch.
+ * - The Kinesis application name used by the Kinesis Client Library (KCL) will be the app name in
+ * [[org.apache.spark.SparkConf]].
+ *
+ * @param jssc Java StreamingContext object
+ * @param streamName Kinesis stream name
+ * @param endpointUrl Endpoint url of Kinesis service
+ * (e.g., https://kinesis.us-east-1.amazonaws.com)
+ * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
+ * See the Kinesis Spark Streaming documentation for more
+ * details on the different types of checkpoints.
+ * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
+ * worker's initial starting position in the stream.
+ * The values are either the beginning of the stream
+ * per Kinesis' limit of 24 hours
+ * (InitialPositionInStream.TRIM_HORIZON) or
+ * the tip of the stream (InitialPositionInStream.LATEST).
+ * @param storageLevel Storage level to use for storing the received objects
+ * StorageLevel.MEMORY_AND_DISK_2 is recommended.
+ */
+ @deprecated("use other forms of createStream", "1.4.0")
+ def createStream(
+ jssc: JavaStreamingContext,
+ streamName: String,
+ endpointUrl: String,
checkpointInterval: Duration,
initialPositionInStream: InitialPositionInStream,
- storageLevel: StorageLevel): JavaReceiverInputDStream[Array[Byte]] = {
- jssc.receiverStream(new KinesisReceiver(jssc.ssc.sc.appName, streamName,
- endpointUrl, checkpointInterval, initialPositionInStream, storageLevel))
+ storageLevel: StorageLevel
+ ): JavaReceiverInputDStream[Array[Byte]] = {
+ createStream(
+ jssc.ssc, streamName, endpointUrl, checkpointInterval, initialPositionInStream, storageLevel)
+ }
+
+ private def getRegionByEndpoint(endpointUrl: String): String = {
+ RegionUtils.getRegionByEndpoint(endpointUrl).getName()
+ }
+
+ private def validateRegion(regionName: String): String = {
+ Option(RegionUtils.getRegion(regionName)).map { _.getName }.getOrElse {
+ throw new IllegalArgumentException(s"Region name '$regionName' is not valid")
+ }
}
}
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
index 255fe65819..7c17ee9dce 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
@@ -40,6 +40,7 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorC
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
import com.amazonaws.services.kinesis.model.Record
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
/**
* Suite of Kinesis streaming receiver tests focusing mostly on the KinesisRecordProcessor
@@ -81,12 +82,20 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
checkpointStateMock, currentClockMock)
}
- test("kinesis utils api") {
+ test("KinesisUtils API") {
val ssc = new StreamingContext(master, framework, batchDuration)
// Tests the API, does not actually test data receiving
- val kinesisStream = KinesisUtils.createStream(ssc, "mySparkStream",
+ val kinesisStream1 = KinesisUtils.createStream(ssc, "mySparkStream",
"https://kinesis.us-west-2.amazonaws.com", Seconds(2),
- InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2);
+ InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
+ val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
+ "https://kinesis.us-west-2.amazonaws.com", "us-west-2",
+ InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2)
+ val kinesisStream3 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
+ "https://kinesis.us-west-2.amazonaws.com", "us-west-2",
+ InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2,
+ "awsAccessKey", "awsSecretKey")
+
ssc.stop()
}