aboutsummaryrefslogtreecommitdiff
path: root/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
diff options
context:
space:
mode:
Diffstat (limited to 'extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala')
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala76
1 files changed, 0 insertions, 76 deletions
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
deleted file mode 100644
index 5223c81a8e..0000000000
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kinesis
-
-import scala.reflect.ClassTag
-
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
-import com.amazonaws.services.kinesis.model.Record
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.storage.{BlockId, StorageLevel}
-import org.apache.spark.streaming.{Duration, StreamingContext, Time}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
-
-private[kinesis] class KinesisInputDStream[T: ClassTag](
- _ssc: StreamingContext,
- streamName: String,
- endpointUrl: String,
- regionName: String,
- initialPositionInStream: InitialPositionInStream,
- checkpointAppName: String,
- checkpointInterval: Duration,
- storageLevel: StorageLevel,
- messageHandler: Record => T,
- awsCredentialsOption: Option[SerializableAWSCredentials]
- ) extends ReceiverInputDStream[T](_ssc) {
-
- private[streaming]
- override def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = {
-
- // This returns true even for when blockInfos is empty
- val allBlocksHaveRanges = blockInfos.map { _.metadataOption }.forall(_.nonEmpty)
-
- if (allBlocksHaveRanges) {
- // Create a KinesisBackedBlockRDD, even when there are no blocks
- val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
- val seqNumRanges = blockInfos.map {
- _.metadataOption.get.asInstanceOf[SequenceNumberRanges] }.toArray
- val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
- logDebug(s"Creating KinesisBackedBlockRDD for $time with ${seqNumRanges.length} " +
- s"seq number ranges: ${seqNumRanges.mkString(", ")} ")
- new KinesisBackedBlockRDD(
- context.sc, regionName, endpointUrl, blockIds, seqNumRanges,
- isBlockIdValid = isBlockIdValid,
- retryTimeoutMs = ssc.graph.batchDuration.milliseconds.toInt,
- messageHandler = messageHandler,
- awsCredentialsOption = awsCredentialsOption)
- } else {
- logWarning("Kinesis sequence number information was not present with some block metadata," +
- " it may not be possible to recover from failures")
- super.createBlockRDD(time, blockInfos)
- }
- }
-
- override def getReceiver(): Receiver[T] = {
- new KinesisReceiver(streamName, endpointUrl, regionName, initialPositionInStream,
- checkpointAppName, checkpointInterval, storageLevel, messageHandler, awsCredentialsOption)
- }
-}