diff options
Diffstat (limited to 'external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala')
-rw-r--r-- | external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala | 259 |
1 files changed, 247 insertions, 12 deletions
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala index fbc6b99443..8970ad2baf 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala @@ -22,24 +22,28 @@ import scala.reflect.ClassTag import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream import com.amazonaws.services.kinesis.model.Record +import org.apache.spark.annotation.InterfaceStability import org.apache.spark.rdd.RDD import org.apache.spark.storage.{BlockId, StorageLevel} import org.apache.spark.streaming.{Duration, StreamingContext, Time} +import org.apache.spark.streaming.api.java.JavaStreamingContext import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.streaming.scheduler.ReceivedBlockInfo private[kinesis] class KinesisInputDStream[T: ClassTag]( _ssc: StreamingContext, - streamName: String, - endpointUrl: String, - regionName: String, - initialPositionInStream: InitialPositionInStream, - checkpointAppName: String, - checkpointInterval: Duration, - storageLevel: StorageLevel, - messageHandler: Record => T, - kinesisCredsProvider: SerializableCredentialsProvider + val streamName: String, + val endpointUrl: String, + val regionName: String, + val initialPositionInStream: InitialPositionInStream, + val checkpointAppName: String, + val checkpointInterval: Duration, + val _storageLevel: StorageLevel, + val messageHandler: Record => T, + val kinesisCreds: SparkAWSCredentials, + val dynamoDBCreds: Option[SparkAWSCredentials], + val cloudWatchCreds: Option[SparkAWSCredentials] ) extends ReceiverInputDStream[T](_ssc) { private[streaming] @@ -61,7 +65,7 @@ private[kinesis] class KinesisInputDStream[T: ClassTag]( isBlockIdValid = isBlockIdValid, retryTimeoutMs = ssc.graph.batchDuration.milliseconds.toInt, messageHandler = messageHandler, - kinesisCredsProvider = kinesisCredsProvider) + kinesisCreds = kinesisCreds) } else { logWarning("Kinesis sequence number information was not present with some block metadata," + " it may not be possible to recover from failures") @@ -71,7 +75,238 @@ private[kinesis] class KinesisInputDStream[T: ClassTag]( override def getReceiver(): Receiver[T] = { new KinesisReceiver(streamName, endpointUrl, regionName, initialPositionInStream, - checkpointAppName, checkpointInterval, storageLevel, messageHandler, - kinesisCredsProvider) + checkpointAppName, checkpointInterval, _storageLevel, messageHandler, + kinesisCreds, dynamoDBCreds, cloudWatchCreds) } } + +@InterfaceStability.Evolving +object KinesisInputDStream { + /** + * Builder for [[KinesisInputDStream]] instances. + * + * @since 2.2.0 + */ + @InterfaceStability.Evolving + class Builder { + // Required params + private var streamingContext: Option[StreamingContext] = None + private var streamName: Option[String] = None + private var checkpointAppName: Option[String] = None + + // Params with defaults + private var endpointUrl: Option[String] = None + private var regionName: Option[String] = None + private var initialPositionInStream: Option[InitialPositionInStream] = None + private var checkpointInterval: Option[Duration] = None + private var storageLevel: Option[StorageLevel] = None + private var kinesisCredsProvider: Option[SparkAWSCredentials] = None + private var dynamoDBCredsProvider: Option[SparkAWSCredentials] = None + private var cloudWatchCredsProvider: Option[SparkAWSCredentials] = None + + /** + * Sets the StreamingContext that will be used to construct the Kinesis DStream. This is a + * required parameter. + * + * @param ssc [[StreamingContext]] used to construct Kinesis DStreams + * @return Reference to this [[KinesisInputDStream.Builder]] + */ + def streamingContext(ssc: StreamingContext): Builder = { + streamingContext = Option(ssc) + this + } + + /** + * Sets the StreamingContext that will be used to construct the Kinesis DStream. This is a + * required parameter. + * + * @param jssc [[JavaStreamingContext]] used to construct Kinesis DStreams + * @return Reference to this [[KinesisInputDStream.Builder]] + */ + def streamingContext(jssc: JavaStreamingContext): Builder = { + streamingContext = Option(jssc.ssc) + this + } + + /** + * Sets the name of the Kinesis stream that the DStream will read from. This is a required + * parameter. + * + * @param streamName Name of Kinesis stream that the DStream will read from + * @return Reference to this [[KinesisInputDStream.Builder]] + */ + def streamName(streamName: String): Builder = { + this.streamName = Option(streamName) + this + } + + /** + * Sets the KCL application name to use when checkpointing state to DynamoDB. This is a + * required parameter. + * + * @param appName Value to use for the KCL app name (used when creating the DynamoDB checkpoint + * table and when writing metrics to CloudWatch) + * @return Reference to this [[KinesisInputDStream.Builder]] + */ + def checkpointAppName(appName: String): Builder = { + checkpointAppName = Option(appName) + this + } + + /** + * Sets the AWS Kinesis endpoint URL. Defaults to "https://kinesis.us-east-1.amazonaws.com" if + * no custom value is specified + * + * @param url Kinesis endpoint URL to use + * @return Reference to this [[KinesisInputDStream.Builder]] + */ + def endpointUrl(url: String): Builder = { + endpointUrl = Option(url) + this + } + + /** + * Sets the AWS region to construct clients for. Defaults to "us-east-1" if no custom value + * is specified. + * + * @param regionName Name of AWS region to use (e.g. "us-west-2") + * @return Reference to this [[KinesisInputDStream.Builder]] + */ + def regionName(regionName: String): Builder = { + this.regionName = Option(regionName) + this + } + + /** + * Sets the initial position data is read from in the Kinesis stream. Defaults to + * [[InitialPositionInStream.LATEST]] if no custom value is specified. + * + * @param initialPosition InitialPositionInStream value specifying where Spark Streaming + * will start reading records in the Kinesis stream from + * @return Reference to this [[KinesisInputDStream.Builder]] + */ + def initialPositionInStream(initialPosition: InitialPositionInStream): Builder = { + initialPositionInStream = Option(initialPosition) + this + } + + /** + * Sets how often the KCL application state is checkpointed to DynamoDB. Defaults to the Spark + * Streaming batch interval if no custom value is specified. + * + * @param interval [[Duration]] specifying how often the KCL state should be checkpointed to + * DynamoDB. + * @return Reference to this [[KinesisInputDStream.Builder]] + */ + def checkpointInterval(interval: Duration): Builder = { + checkpointInterval = Option(interval) + this + } + + /** + * Sets the storage level of the blocks for the DStream created. Defaults to + * [[StorageLevel.MEMORY_AND_DISK_2]] if no custom value is specified. + * + * @param storageLevel [[StorageLevel]] to use for the DStream data blocks + * @return Reference to this [[KinesisInputDStream.Builder]] + */ + def storageLevel(storageLevel: StorageLevel): Builder = { + this.storageLevel = Option(storageLevel) + this + } + + /** + * Sets the [[SparkAWSCredentials]] to use for authenticating to the AWS Kinesis + * endpoint. Defaults to [[DefaultCredentialsProvider]] if no custom value is specified. + * + * @param credentials [[SparkAWSCredentials]] to use for Kinesis authentication + */ + def kinesisCredentials(credentials: SparkAWSCredentials): Builder = { + kinesisCredsProvider = Option(credentials) + this + } + + /** + * Sets the [[SparkAWSCredentials]] to use for authenticating to the AWS DynamoDB + * endpoint. Will use the same credentials used for AWS Kinesis if no custom value is set. + * + * @param credentials [[SparkAWSCredentials]] to use for DynamoDB authentication + */ + def dynamoDBCredentials(credentials: SparkAWSCredentials): Builder = { + dynamoDBCredsProvider = Option(credentials) + this + } + + /** + * Sets the [[SparkAWSCredentials]] to use for authenticating to the AWS CloudWatch + * endpoint. Will use the same credentials used for AWS Kinesis if no custom value is set. + * + * @param credentials [[SparkAWSCredentials]] to use for CloudWatch authentication + */ + def cloudWatchCredentials(credentials: SparkAWSCredentials): Builder = { + cloudWatchCredsProvider = Option(credentials) + this + } + + /** + * Create a new instance of [[KinesisInputDStream]] with configured parameters and the provided + * message handler. + * + * @param handler Function converting [[Record]] instances read by the KCL to DStream type [[T]] + * @return Instance of [[KinesisInputDStream]] constructed with configured parameters + */ + def buildWithMessageHandler[T: ClassTag]( + handler: Record => T): KinesisInputDStream[T] = { + val ssc = getRequiredParam(streamingContext, "streamingContext") + new KinesisInputDStream( + ssc, + getRequiredParam(streamName, "streamName"), + endpointUrl.getOrElse(DEFAULT_KINESIS_ENDPOINT_URL), + regionName.getOrElse(DEFAULT_KINESIS_REGION_NAME), + initialPositionInStream.getOrElse(DEFAULT_INITIAL_POSITION_IN_STREAM), + getRequiredParam(checkpointAppName, "checkpointAppName"), + checkpointInterval.getOrElse(ssc.graph.batchDuration), + storageLevel.getOrElse(DEFAULT_STORAGE_LEVEL), + handler, + kinesisCredsProvider.getOrElse(DefaultCredentials), + dynamoDBCredsProvider, + cloudWatchCredsProvider) + } + + /** + * Create a new instance of [[KinesisInputDStream]] with configured parameters and using the + * default message handler, which returns [[Array[Byte]]]. + * + * @return Instance of [[KinesisInputDStream]] constructed with configured parameters + */ + def build(): KinesisInputDStream[Array[Byte]] = buildWithMessageHandler(defaultMessageHandler) + + private def getRequiredParam[T](param: Option[T], paramName: String): T = param.getOrElse { + throw new IllegalArgumentException(s"No value provided for required parameter $paramName") + } + } + + /** + * Creates a [[KinesisInputDStream.Builder]] for constructing [[KinesisInputDStream]] instances. + * + * @since 2.2.0 + * + * @return [[KinesisInputDStream.Builder]] instance + */ + def builder: Builder = new Builder + + private[kinesis] def defaultMessageHandler(record: Record): Array[Byte] = { + if (record == null) return null + val byteBuffer = record.getData() + val byteArray = new Array[Byte](byteBuffer.remaining()) + byteBuffer.get(byteArray) + byteArray + } + + private[kinesis] val DEFAULT_KINESIS_ENDPOINT_URL: String = + "https://kinesis.us-east-1.amazonaws.com" + private[kinesis] val DEFAULT_KINESIS_REGION_NAME: String = "us-east-1" + private[kinesis] val DEFAULT_INITIAL_POSITION_IN_STREAM: InitialPositionInStream = + InitialPositionInStream.LATEST + private[kinesis] val DEFAULT_STORAGE_LEVEL: StorageLevel = StorageLevel.MEMORY_AND_DISK_2 +} |