aboutsummaryrefslogtreecommitdiff
path: root/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
diff options
context:
space:
mode:
Diffstat (limited to 'extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala')
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala15
1 files changed, 10 insertions, 5 deletions
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
index 2e4204dcb6..72ab6357a5 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
@@ -17,7 +17,10 @@
package org.apache.spark.streaming.kinesis
+import scala.reflect.ClassTag
+
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import com.amazonaws.services.kinesis.model.Record
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.{BlockId, StorageLevel}
@@ -26,7 +29,7 @@ import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
import org.apache.spark.streaming.{Duration, StreamingContext, Time}
-private[kinesis] class KinesisInputDStream(
+private[kinesis] class KinesisInputDStream[T: ClassTag](
@transient _ssc: StreamingContext,
streamName: String,
endpointUrl: String,
@@ -35,11 +38,12 @@ private[kinesis] class KinesisInputDStream(
checkpointAppName: String,
checkpointInterval: Duration,
storageLevel: StorageLevel,
+ messageHandler: Record => T,
awsCredentialsOption: Option[SerializableAWSCredentials]
- ) extends ReceiverInputDStream[Array[Byte]](_ssc) {
+ ) extends ReceiverInputDStream[T](_ssc) {
private[streaming]
- override def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[Array[Byte]] = {
+ override def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = {
// This returns true even for when blockInfos is empty
val allBlocksHaveRanges = blockInfos.map { _.metadataOption }.forall(_.nonEmpty)
@@ -56,6 +60,7 @@ private[kinesis] class KinesisInputDStream(
context.sc, regionName, endpointUrl, blockIds, seqNumRanges,
isBlockIdValid = isBlockIdValid,
retryTimeoutMs = ssc.graph.batchDuration.milliseconds.toInt,
+ messageHandler = messageHandler,
awsCredentialsOption = awsCredentialsOption)
} else {
logWarning("Kinesis sequence number information was not present with some block metadata," +
@@ -64,8 +69,8 @@ private[kinesis] class KinesisInputDStream(
}
}
- override def getReceiver(): Receiver[Array[Byte]] = {
+ override def getReceiver(): Receiver[T] = {
new KinesisReceiver(streamName, endpointUrl, regionName, initialPositionInStream,
- checkpointAppName, checkpointInterval, storageLevel, awsCredentialsOption)
+ checkpointAppName, checkpointInterval, storageLevel, messageHandler, awsCredentialsOption)
}
}