aboutsummaryrefslogtreecommitdiff
path: root/extras
diff options
context:
space:
mode:
Diffstat (limited to 'extras')
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala20
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala71
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala195
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala76
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala63
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala21
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala18
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala4
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala41
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala222
10 files changed, 566 insertions, 165 deletions
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
index 8f144a4d97..a003ddf325 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
@@ -37,16 +37,18 @@ case class SequenceNumberRange(
/** Class representing an array of Kinesis sequence number ranges */
private[kinesis]
-case class SequenceNumberRanges(ranges: Array[SequenceNumberRange]) {
+case class SequenceNumberRanges(ranges: Seq[SequenceNumberRange]) {
def isEmpty(): Boolean = ranges.isEmpty
+
def nonEmpty(): Boolean = ranges.nonEmpty
+
override def toString(): String = ranges.mkString("SequenceNumberRanges(", ", ", ")")
}
private[kinesis]
object SequenceNumberRanges {
def apply(range: SequenceNumberRange): SequenceNumberRanges = {
- new SequenceNumberRanges(Array(range))
+ new SequenceNumberRanges(Seq(range))
}
}
@@ -66,14 +68,14 @@ class KinesisBackedBlockRDDPartition(
*/
private[kinesis]
class KinesisBackedBlockRDD(
- sc: SparkContext,
- regionId: String,
- endpointUrl: String,
+ @transient sc: SparkContext,
+ val regionName: String,
+ val endpointUrl: String,
@transient blockIds: Array[BlockId],
- @transient arrayOfseqNumberRanges: Array[SequenceNumberRanges],
+ @transient val arrayOfseqNumberRanges: Array[SequenceNumberRanges],
@transient isBlockIdValid: Array[Boolean] = Array.empty,
- retryTimeoutMs: Int = 10000,
- awsCredentialsOption: Option[SerializableAWSCredentials] = None
+ val retryTimeoutMs: Int = 10000,
+ val awsCredentialsOption: Option[SerializableAWSCredentials] = None
) extends BlockRDD[Array[Byte]](sc, blockIds) {
require(blockIds.length == arrayOfseqNumberRanges.length,
@@ -104,7 +106,7 @@ class KinesisBackedBlockRDD(
}
partition.seqNumberRanges.ranges.iterator.flatMap { range =>
new KinesisSequenceRangeIterator(
- credenentials, endpointUrl, regionId, range, retryTimeoutMs)
+ credenentials, endpointUrl, regionName, range, retryTimeoutMs)
}
}
if (partition.isBlockIdValid) {
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
new file mode 100644
index 0000000000..2e4204dcb6
--- /dev/null
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.{BlockId, StorageLevel}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
+import org.apache.spark.streaming.{Duration, StreamingContext, Time}
+
+private[kinesis] class KinesisInputDStream(
+ @transient _ssc: StreamingContext,
+ streamName: String,
+ endpointUrl: String,
+ regionName: String,
+ initialPositionInStream: InitialPositionInStream,
+ checkpointAppName: String,
+ checkpointInterval: Duration,
+ storageLevel: StorageLevel,
+ awsCredentialsOption: Option[SerializableAWSCredentials]
+ ) extends ReceiverInputDStream[Array[Byte]](_ssc) {
+
+ private[streaming]
+ override def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[Array[Byte]] = {
+
+ // This returns true even for when blockInfos is empty
+ val allBlocksHaveRanges = blockInfos.map { _.metadataOption }.forall(_.nonEmpty)
+
+ if (allBlocksHaveRanges) {
+ // Create a KinesisBackedBlockRDD, even when there are no blocks
+ val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
+ val seqNumRanges = blockInfos.map {
+ _.metadataOption.get.asInstanceOf[SequenceNumberRanges] }.toArray
+ val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
+ logDebug(s"Creating KinesisBackedBlockRDD for $time with ${seqNumRanges.length} " +
+ s"seq number ranges: ${seqNumRanges.mkString(", ")} ")
+ new KinesisBackedBlockRDD(
+ context.sc, regionName, endpointUrl, blockIds, seqNumRanges,
+ isBlockIdValid = isBlockIdValid,
+ retryTimeoutMs = ssc.graph.batchDuration.milliseconds.toInt,
+ awsCredentialsOption = awsCredentialsOption)
+ } else {
+ logWarning("Kinesis sequence number information was not present with some block metadata," +
+ " it may not be possible to recover from failures")
+ super.createBlockRDD(time, blockInfos)
+ }
+ }
+
+ override def getReceiver(): Receiver[Array[Byte]] = {
+ new KinesisReceiver(streamName, endpointUrl, regionName, initialPositionInStream,
+ checkpointAppName, checkpointInterval, storageLevel, awsCredentialsOption)
+ }
+}
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
index 1a8a4cecc1..a4baeec084 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -18,17 +18,20 @@ package org.apache.spark.streaming.kinesis
import java.util.UUID
+import scala.collection.JavaConversions.asScalaIterator
+import scala.collection.mutable
import scala.util.control.NonFatal
-import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, BasicAWSCredentials, DefaultAWSCredentialsProviderChain}
+import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, DefaultAWSCredentialsProviderChain}
import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorFactory}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration, Worker}
+import com.amazonaws.services.kinesis.model.Record
-import org.apache.spark.Logging
-import org.apache.spark.storage.StorageLevel
+import org.apache.spark.storage.{StorageLevel, StreamBlockId}
import org.apache.spark.streaming.Duration
-import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver}
import org.apache.spark.util.Utils
+import org.apache.spark.{Logging, SparkEnv}
private[kinesis]
@@ -42,38 +45,47 @@ case class SerializableAWSCredentials(accessKeyId: String, secretKey: String)
* Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver.
* This implementation relies on the Kinesis Client Library (KCL) Worker as described here:
* https://github.com/awslabs/amazon-kinesis-client
- * This is a custom receiver used with StreamingContext.receiverStream(Receiver) as described here:
- * http://spark.apache.org/docs/latest/streaming-custom-receivers.html
- * Instances of this class will get shipped to the Spark Streaming Workers to run within a
- * Spark Executor.
*
- * @param appName Kinesis application name. Kinesis Apps are mapped to Kinesis Streams
- * by the Kinesis Client Library. If you change the App name or Stream name,
- * the KCL will throw errors. This usually requires deleting the backing
- * DynamoDB table with the same name this Kinesis application.
+ * The way this Receiver works is as follows:
+ * - The receiver starts a KCL Worker, which is essentially runs a threadpool of multiple
+ * KinesisRecordProcessor
+ * - Each KinesisRecordProcessor receives data from a Kinesis shard in batches. Each batch is
+ * inserted into a Block Generator, and the corresponding range of sequence numbers is recorded.
+ * - When the block generator defines a block, then the recorded sequence number ranges that were
+ * inserted into the block are recorded separately for being used later.
+ * - When the block is ready to be pushed, the block is pushed and the ranges are reported as
+ * metadata of the block. In addition, the ranges are used to find out the latest sequence
+ * number for each shard that can be checkpointed through the DynamoDB.
+ * - Periodically, each KinesisRecordProcessor checkpoints the latest successfully stored sequence
+ * number for it own shard.
+ *
* @param streamName Kinesis stream name
* @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
* @param regionName Region name used by the Kinesis Client Library for
* DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
- * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
- * See the Kinesis Spark Streaming documentation for more
- * details on the different types of checkpoints.
* @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
* worker's initial starting position in the stream.
* The values are either the beginning of the stream
* per Kinesis' limit of 24 hours
* (InitialPositionInStream.TRIM_HORIZON) or
* the tip of the stream (InitialPositionInStream.LATEST).
+ * @param checkpointAppName Kinesis application name. Kinesis Apps are mapped to Kinesis Streams
+ * by the Kinesis Client Library. If you change the App name or Stream name,
+ * the KCL will throw errors. This usually requires deleting the backing
+ * DynamoDB table with the same name this Kinesis application.
+ * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
+ * See the Kinesis Spark Streaming documentation for more
+ * details on the different types of checkpoints.
* @param storageLevel Storage level to use for storing the received objects
* @param awsCredentialsOption Optional AWS credentials, used when user directly specifies
* the credentials
*/
private[kinesis] class KinesisReceiver(
- appName: String,
- streamName: String,
+ val streamName: String,
endpointUrl: String,
regionName: String,
initialPositionInStream: InitialPositionInStream,
+ checkpointAppName: String,
checkpointInterval: Duration,
storageLevel: StorageLevel,
awsCredentialsOption: Option[SerializableAWSCredentials]
@@ -90,7 +102,7 @@ private[kinesis] class KinesisReceiver(
* workerId is used by the KCL should be based on the ip address of the actual Spark Worker
* where this code runs (not the driver's IP address.)
*/
- private var workerId: String = null
+ @volatile private var workerId: String = null
/**
* Worker is the core client abstraction from the Kinesis Client Library (KCL).
@@ -98,22 +110,40 @@ private[kinesis] class KinesisReceiver(
* Each shard is assigned its own IRecordProcessor and the worker run multiple such
* processors.
*/
- private var worker: Worker = null
+ @volatile private var worker: Worker = null
+ @volatile private var workerThread: Thread = null
- /** Thread running the worker */
- private var workerThread: Thread = null
+ /** BlockGenerator used to generates blocks out of Kinesis data */
+ @volatile private var blockGenerator: BlockGenerator = null
/**
+ * Sequence number ranges added to the current block being generated.
+ * Accessing and updating of this map is synchronized by locks in BlockGenerator.
+ */
+ private val seqNumRangesInCurrentBlock = new mutable.ArrayBuffer[SequenceNumberRange]
+
+ /** Sequence number ranges of data added to each generated block */
+ private val blockIdToSeqNumRanges = new mutable.HashMap[StreamBlockId, SequenceNumberRanges]
+ with mutable.SynchronizedMap[StreamBlockId, SequenceNumberRanges]
+
+ /**
+ * Latest sequence number ranges that have been stored successfully.
+ * This is used for checkpointing through KCL */
+ private val shardIdToLatestStoredSeqNum = new mutable.HashMap[String, String]
+ with mutable.SynchronizedMap[String, String]
+ /**
* This is called when the KinesisReceiver starts and must be non-blocking.
* The KCL creates and manages the receiving/processing thread pool through Worker.run().
*/
override def onStart() {
+ blockGenerator = new BlockGenerator(new GeneratedBlockHandler, streamId, SparkEnv.get.conf)
+
workerId = Utils.localHostName() + ":" + UUID.randomUUID()
// KCL config instance
val awsCredProvider = resolveAWSCredentialsProvider()
val kinesisClientLibConfiguration =
- new KinesisClientLibConfiguration(appName, streamName, awsCredProvider, workerId)
+ new KinesisClientLibConfiguration(checkpointAppName, streamName, awsCredProvider, workerId)
.withKinesisEndpoint(endpointUrl)
.withInitialPositionInStream(initialPositionInStream)
.withTaskBackoffTimeMillis(500)
@@ -141,6 +171,10 @@ private[kinesis] class KinesisReceiver(
}
}
}
+
+ blockIdToSeqNumRanges.clear()
+ blockGenerator.start()
+
workerThread.setName(s"Kinesis Receiver ${streamId}")
workerThread.setDaemon(true)
workerThread.start()
@@ -165,6 +199,81 @@ private[kinesis] class KinesisReceiver(
workerId = null
}
+ /** Add records of the given shard to the current block being generated */
+ private[kinesis] def addRecords(shardId: String, records: java.util.List[Record]): Unit = {
+ if (records.size > 0) {
+ val dataIterator = records.iterator().map { record =>
+ val byteBuffer = record.getData()
+ val byteArray = new Array[Byte](byteBuffer.remaining())
+ byteBuffer.get(byteArray)
+ byteArray
+ }
+ val metadata = SequenceNumberRange(streamName, shardId,
+ records.get(0).getSequenceNumber(), records.get(records.size() - 1).getSequenceNumber())
+ blockGenerator.addMultipleDataWithCallback(dataIterator, metadata)
+
+ }
+ }
+
+ /** Get the latest sequence number for the given shard that can be checkpointed through KCL */
+ private[kinesis] def getLatestSeqNumToCheckpoint(shardId: String): Option[String] = {
+ shardIdToLatestStoredSeqNum.get(shardId)
+ }
+
+ /**
+ * Remember the range of sequence numbers that was added to the currently active block.
+ * Internally, this is synchronized with `finalizeRangesForCurrentBlock()`.
+ */
+ private def rememberAddedRange(range: SequenceNumberRange): Unit = {
+ seqNumRangesInCurrentBlock += range
+ }
+
+ /**
+ * Finalize the ranges added to the block that was active and prepare the ranges buffer
+ * for next block. Internally, this is synchronized with `rememberAddedRange()`.
+ */
+ private def finalizeRangesForCurrentBlock(blockId: StreamBlockId): Unit = {
+ blockIdToSeqNumRanges(blockId) = SequenceNumberRanges(seqNumRangesInCurrentBlock.toArray)
+ seqNumRangesInCurrentBlock.clear()
+ logDebug(s"Generated block $blockId has $blockIdToSeqNumRanges")
+ }
+
+ /** Store the block along with its associated ranges */
+ private def storeBlockWithRanges(
+ blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[Array[Byte]]): Unit = {
+ val rangesToReportOption = blockIdToSeqNumRanges.remove(blockId)
+ if (rangesToReportOption.isEmpty) {
+ stop("Error while storing block into Spark, could not find sequence number ranges " +
+ s"for block $blockId")
+ return
+ }
+
+ val rangesToReport = rangesToReportOption.get
+ var attempt = 0
+ var stored = false
+ var throwable: Throwable = null
+ while (!stored && attempt <= 3) {
+ try {
+ store(arrayBuffer, rangesToReport)
+ stored = true
+ } catch {
+ case NonFatal(th) =>
+ attempt += 1
+ throwable = th
+ }
+ }
+ if (!stored) {
+ stop("Error while storing block into Spark", throwable)
+ }
+
+ // Update the latest sequence number that have been successfully stored for each shard
+ // Note that we are doing this sequentially because the array of sequence number ranges
+ // is assumed to be
+ rangesToReport.ranges.foreach { range =>
+ shardIdToLatestStoredSeqNum(range.shardId) = range.toSeqNumber
+ }
+ }
+
/**
* If AWS credential is provided, return a AWSCredentialProvider returning that credential.
* Otherwise, return the DefaultAWSCredentialsProviderChain.
@@ -182,4 +291,46 @@ private[kinesis] class KinesisReceiver(
new DefaultAWSCredentialsProviderChain()
}
}
+
+
+ /**
+ * Class to handle blocks generated by this receiver's block generator. Specifically, in
+ * the context of the Kinesis Receiver, this handler does the following.
+ *
+ * - When an array of records is added to the current active block in the block generator,
+ * this handler keeps track of the corresponding sequence number range.
+ * - When the currently active block is ready to sealed (not more records), this handler
+ * keep track of the list of ranges added into this block in another H
+ */
+ private class GeneratedBlockHandler extends BlockGeneratorListener {
+
+ /**
+ * Callback method called after a data item is added into the BlockGenerator.
+ * The data addition, block generation, and calls to onAddData and onGenerateBlock
+ * are all synchronized through the same lock.
+ */
+ def onAddData(data: Any, metadata: Any): Unit = {
+ rememberAddedRange(metadata.asInstanceOf[SequenceNumberRange])
+ }
+
+ /**
+ * Callback method called after a block has been generated.
+ * The data addition, block generation, and calls to onAddData and onGenerateBlock
+ * are all synchronized through the same lock.
+ */
+ def onGenerateBlock(blockId: StreamBlockId): Unit = {
+ finalizeRangesForCurrentBlock(blockId)
+ }
+
+ /** Callback method called when a block is ready to be pushed / stored. */
+ def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
+ storeBlockWithRanges(blockId,
+ arrayBuffer.asInstanceOf[mutable.ArrayBuffer[Array[Byte]]])
+ }
+
+ /** Callback called in case of any error in internal of the BlockGenerator */
+ def onError(message: String, throwable: Throwable): Unit = {
+ reportError(message, throwable)
+ }
+ }
}
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
index fe9e3a0c79..b240512332 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
@@ -18,20 +18,16 @@ package org.apache.spark.streaming.kinesis
import java.util.List
-import scala.collection.JavaConversions.asScalaBuffer
import scala.util.Random
+import scala.util.control.NonFatal
-import org.apache.spark.Logging
-
-import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException
-import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException
-import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException
-import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException
-import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
-import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.{InvalidStateException, KinesisClientLibDependencyException, ShutdownException, ThrottlingException}
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorCheckpointer}
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
import com.amazonaws.services.kinesis.model.Record
+import org.apache.spark.Logging
+
/**
* Kinesis-specific implementation of the Kinesis Client Library (KCL) IRecordProcessor.
* This implementation operates on the Array[Byte] from the KinesisReceiver.
@@ -51,6 +47,7 @@ private[kinesis] class KinesisRecordProcessor(
checkpointState: KinesisCheckpointState) extends IRecordProcessor with Logging {
// shardId to be populated during initialize()
+ @volatile
private var shardId: String = _
/**
@@ -75,47 +72,38 @@ private[kinesis] class KinesisRecordProcessor(
override def processRecords(batch: List[Record], checkpointer: IRecordProcessorCheckpointer) {
if (!receiver.isStopped()) {
try {
- /*
- * Notes:
- * 1) If we try to store the raw ByteBuffer from record.getData(), the Spark Streaming
- * Receiver.store(ByteBuffer) attempts to deserialize the ByteBuffer using the
- * internally-configured Spark serializer (kryo, etc).
- * 2) This is not desirable, so we instead store a raw Array[Byte] and decouple
- * ourselves from Spark's internal serialization strategy.
- * 3) For performance, the BlockGenerator is asynchronously queuing elements within its
- * memory before creating blocks. This prevents the small block scenario, but requires
- * that you register callbacks to know when a block has been generated and stored
- * (WAL is sufficient for storage) before can checkpoint back to the source.
- */
- batch.foreach(record => receiver.store(record.getData().array()))
-
- logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId")
+ receiver.addRecords(shardId, batch)
+ logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId")
/*
- * Checkpoint the sequence number of the last record successfully processed/stored
- * in the batch.
- * In this implementation, we're checkpointing after the given checkpointIntervalMillis.
- * Note that this logic requires that processRecords() be called AND that it's time to
- * checkpoint. I point this out because there is no background thread running the
- * checkpointer. Checkpointing is tested and trigger only when a new batch comes in.
- * If the worker is shutdown cleanly, checkpoint will happen (see shutdown() below).
- * However, if the worker dies unexpectedly, a checkpoint may not happen.
- * This could lead to records being processed more than once.
+ *
+ * Checkpoint the sequence number of the last record successfully stored.
+ * Note that in this current implementation, the checkpointing occurs only when after
+ * checkpointIntervalMillis from the last checkpoint, AND when there is new record
+ * to process. This leads to the checkpointing lagging behind what records have been
+ * stored by the receiver. Ofcourse, this can lead records processed more than once,
+ * under failures and restarts.
+ *
+ * TODO: Instead of checkpointing here, run a separate timer task to perform
+ * checkpointing so that it checkpoints in a timely manner independent of whether
+ * new records are available or not.
*/
if (checkpointState.shouldCheckpoint()) {
- /* Perform the checkpoint */
- KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(), 4, 100)
+ receiver.getLatestSeqNumToCheckpoint(shardId).foreach { latestSeqNum =>
+ /* Perform the checkpoint */
+ KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(latestSeqNum), 4, 100)
- /* Update the next checkpoint time */
- checkpointState.advanceCheckpoint()
+ /* Update the next checkpoint time */
+ checkpointState.advanceCheckpoint()
- logDebug(s"Checkpoint: WorkerId $workerId completed checkpoint of ${batch.size}" +
+ logDebug(s"Checkpoint: WorkerId $workerId completed checkpoint of ${batch.size}" +
s" records for shardId $shardId")
- logDebug(s"Checkpoint: Next checkpoint is at " +
+ logDebug(s"Checkpoint: Next checkpoint is at " +
s" ${checkpointState.checkpointClock.getTimeMillis()} for shardId $shardId")
+ }
}
} catch {
- case e: Throwable => {
+ case NonFatal(e) => {
/*
* If there is a failure within the batch, the batch will not be checkpointed.
* This will potentially cause records since the last checkpoint to be processed
@@ -130,7 +118,7 @@ private[kinesis] class KinesisRecordProcessor(
}
} else {
/* RecordProcessor has been stopped. */
- logInfo(s"Stopped: The Spark KinesisReceiver has stopped for workerId $workerId" +
+ logInfo(s"Stopped: KinesisReceiver has stopped for workerId $workerId" +
s" and shardId $shardId. No more records will be processed.")
}
}
@@ -154,7 +142,11 @@ private[kinesis] class KinesisRecordProcessor(
* It's now OK to read from the new shards that resulted from a resharding event.
*/
case ShutdownReason.TERMINATE =>
- KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(), 4, 100)
+ val latestSeqNumToCheckpointOption = receiver.getLatestSeqNumToCheckpoint(shardId)
+ if (latestSeqNumToCheckpointOption.nonEmpty) {
+ KinesisRecordProcessor.retryRandom(
+ checkpointer.checkpoint(latestSeqNumToCheckpointOption.get), 4, 100)
+ }
/*
* ZOMBIE Use Case. NoOp.
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
index 255ac27f79..711aade182 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -36,22 +36,10 @@ import org.apache.spark.Logging
/**
* Shared utility methods for performing Kinesis tests that actually transfer data
*/
-private class KinesisTestUtils(val endpointUrl: String, _regionName: String) extends Logging {
-
- def this() {
- this("https://kinesis.us-west-2.amazonaws.com", "")
- }
-
- def this(endpointUrl: String) {
- this(endpointUrl, "")
- }
-
- val regionName = if (_regionName.length == 0) {
- RegionUtils.getRegionByEndpoint(endpointUrl).getName()
- } else {
- RegionUtils.getRegion(_regionName).getName()
- }
+private class KinesisTestUtils extends Logging {
+ val endpointUrl = KinesisTestUtils.endpointUrl
+ val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()
val streamShardCount = 2
private val createStreamTimeoutSeconds = 300
@@ -81,11 +69,11 @@ private class KinesisTestUtils(val endpointUrl: String, _regionName: String) ext
}
def createStream(): Unit = {
- logInfo("Creating stream")
require(!streamCreated, "Stream already created")
_streamName = findNonExistentStreamName()
// Create a stream. The number of shards determines the provisioned throughput.
+ logInfo(s"Creating stream ${_streamName}")
val createStreamRequest = new CreateStreamRequest()
createStreamRequest.setStreamName(_streamName)
createStreamRequest.setShardCount(2)
@@ -94,7 +82,7 @@ private class KinesisTestUtils(val endpointUrl: String, _regionName: String) ext
// The stream is now being created. Wait for it to become active.
waitForStreamToBeActive(_streamName)
streamCreated = true
- logInfo("Created stream")
+ logInfo(s"Created stream ${_streamName}")
}
/**
@@ -191,9 +179,38 @@ private class KinesisTestUtils(val endpointUrl: String, _regionName: String) ext
private[kinesis] object KinesisTestUtils {
- val envVarName = "ENABLE_KINESIS_TESTS"
+ val envVarNameForEnablingTests = "ENABLE_KINESIS_TESTS"
+ val endVarNameForEndpoint = "KINESIS_TEST_ENDPOINT_URL"
+ val defaultEndpointUrl = "https://kinesis.us-west-2.amazonaws.com"
+
+ lazy val shouldRunTests = {
+ val isEnvSet = sys.env.get(envVarNameForEnablingTests) == Some("1")
+ if (isEnvSet) {
+ // scalastyle:off println
+ // Print this so that they are easily visible on the console and not hidden in the log4j logs.
+ println(
+ s"""
+ |Kinesis tests that actually send data has been enabled by setting the environment
+ |variable $envVarNameForEnablingTests to 1. This will create Kinesis Streams and
+ |DynamoDB tables in AWS. Please be aware that this may incur some AWS costs.
+ |By default, the tests use the endpoint URL $defaultEndpointUrl to create Kinesis streams.
+ |To change this endpoint URL to a different region, you can set the environment variable
+ |$endVarNameForEndpoint to the desired endpoint URL
+ |(e.g. $endVarNameForEndpoint="https://kinesis.us-west-2.amazonaws.com").
+ """.stripMargin)
+ // scalastyle:on println
+ }
+ isEnvSet
+ }
- val shouldRunTests = sys.env.get(envVarName) == Some("1")
+ lazy val endpointUrl = {
+ val url = sys.env.getOrElse(endVarNameForEndpoint, defaultEndpointUrl)
+ // scalastyle:off println
+ // Print this so that they are easily visible on the console and not hidden in the log4j logs.
+ println(s"Using endpoint URL $url for creating Kinesis streams for tests.")
+ // scalastyle:on println
+ url
+ }
def isAWSCredentialsPresent: Boolean = {
Try { new DefaultAWSCredentialsProviderChain().getCredentials() }.isSuccess
@@ -205,7 +222,13 @@ private[kinesis] object KinesisTestUtils {
Try { new DefaultAWSCredentialsProviderChain().getCredentials() } match {
case Success(cred) => cred
case Failure(e) =>
- throw new Exception("Kinesis tests enabled, but could get not AWS credentials")
+ throw new Exception(
+ s"""
+ |Kinesis tests enabled using environment variable $envVarNameForEnablingTests
+ |but could not find AWS credentials. Please follow instructions in AWS documentation
+ |to set the credentials in your system such that the DefaultAWSCredentialsProviderChain
+ |can find the credentials.
+ """.stripMargin)
}
}
}
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
index 7dab17eba8..c799fadf2d 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
@@ -65,9 +65,8 @@ object KinesisUtils {
): ReceiverInputDStream[Array[Byte]] = {
// Setting scope to override receiver stream's scope of "receiver stream"
ssc.withNamedScope("kinesis stream") {
- ssc.receiverStream(
- new KinesisReceiver(kinesisAppName, streamName, endpointUrl, validateRegion(regionName),
- initialPositionInStream, checkpointInterval, storageLevel, None))
+ new KinesisInputDStream(ssc, streamName, endpointUrl, validateRegion(regionName),
+ initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel, None)
}
}
@@ -112,10 +111,11 @@ object KinesisUtils {
awsAccessKeyId: String,
awsSecretKey: String
): ReceiverInputDStream[Array[Byte]] = {
- ssc.receiverStream(
- new KinesisReceiver(kinesisAppName, streamName, endpointUrl, validateRegion(regionName),
- initialPositionInStream, checkpointInterval, storageLevel,
- Some(SerializableAWSCredentials(awsAccessKeyId, awsSecretKey))))
+ ssc.withNamedScope("kinesis stream") {
+ new KinesisInputDStream(ssc, streamName, endpointUrl, validateRegion(regionName),
+ initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel,
+ Some(SerializableAWSCredentials(awsAccessKeyId, awsSecretKey)))
+ }
}
/**
@@ -155,9 +155,10 @@ object KinesisUtils {
initialPositionInStream: InitialPositionInStream,
storageLevel: StorageLevel
): ReceiverInputDStream[Array[Byte]] = {
- ssc.receiverStream(
- new KinesisReceiver(ssc.sc.appName, streamName, endpointUrl, getRegionByEndpoint(endpointUrl),
- initialPositionInStream, checkpointInterval, storageLevel, None))
+ ssc.withNamedScope("kinesis stream") {
+ new KinesisInputDStream(ssc, streamName, endpointUrl, getRegionByEndpoint(endpointUrl),
+ initialPositionInStream, ssc.sc.appName, checkpointInterval, storageLevel, None)
+ }
}
/**
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
index e81fb11e59..a89e5627e0 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
@@ -24,8 +24,6 @@ import org.apache.spark.{SparkConf, SparkContext, SparkException}
class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll {
- private val regionId = "us-east-1"
- private val endpointUrl = "https://kinesis.us-east-1.amazonaws.com"
private val testData = 1 to 8
private var testUtils: KinesisTestUtils = null
@@ -42,7 +40,7 @@ class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll
override def beforeAll(): Unit = {
runIfTestsEnabled("Prepare KinesisTestUtils") {
- testUtils = new KinesisTestUtils(endpointUrl)
+ testUtils = new KinesisTestUtils()
testUtils.createStream()
shardIdToDataAndSeqNumbers = testUtils.pushData(testData)
@@ -75,21 +73,21 @@ class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll
testIfEnabled("Basic reading from Kinesis") {
// Verify all data using multiple ranges in a single RDD partition
- val receivedData1 = new KinesisBackedBlockRDD(sc, regionId, endpointUrl,
+ val receivedData1 = new KinesisBackedBlockRDD(sc, testUtils.regionName, testUtils.endpointUrl,
fakeBlockIds(1),
Array(SequenceNumberRanges(allRanges.toArray))
).map { bytes => new String(bytes).toInt }.collect()
assert(receivedData1.toSet === testData.toSet)
// Verify all data using one range in each of the multiple RDD partitions
- val receivedData2 = new KinesisBackedBlockRDD(sc, regionId, endpointUrl,
+ val receivedData2 = new KinesisBackedBlockRDD(sc, testUtils.regionName, testUtils.endpointUrl,
fakeBlockIds(allRanges.size),
allRanges.map { range => SequenceNumberRanges(Array(range)) }.toArray
).map { bytes => new String(bytes).toInt }.collect()
assert(receivedData2.toSet === testData.toSet)
// Verify ordering within each partition
- val receivedData3 = new KinesisBackedBlockRDD(sc, regionId, endpointUrl,
+ val receivedData3 = new KinesisBackedBlockRDD(sc, testUtils.regionName, testUtils.endpointUrl,
fakeBlockIds(allRanges.size),
allRanges.map { range => SequenceNumberRanges(Array(range)) }.toArray
).map { bytes => new String(bytes).toInt }.collectPartitions()
@@ -211,7 +209,8 @@ class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll
}, "Incorrect configuration of RDD, unexpected ranges set"
)
- val rdd = new KinesisBackedBlockRDD(sc, regionId, endpointUrl, blockIds, ranges)
+ val rdd = new KinesisBackedBlockRDD(
+ sc, testUtils.regionName, testUtils.endpointUrl, blockIds, ranges)
val collectedData = rdd.map { bytes =>
new String(bytes).toInt
}.collect()
@@ -224,8 +223,9 @@ class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll
if (testIsBlockValid) {
require(numPartitionsInBM === numPartitions, "All partitions must be in BlockManager")
require(numPartitionsInKinesis === 0, "No partitions must be in Kinesis")
- val rdd2 = new KinesisBackedBlockRDD(sc, regionId, endpointUrl, blockIds.toArray,
- ranges, isBlockIdValid = Array.fill(blockIds.length)(false))
+ val rdd2 = new KinesisBackedBlockRDD(
+ sc, testUtils.regionName, testUtils.endpointUrl, blockIds.toArray, ranges,
+ isBlockIdValid = Array.fill(blockIds.length)(false))
intercept[SparkException] {
rdd2.collect()
}
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
index 8373138785..ee428f31d6 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
@@ -31,7 +31,7 @@ trait KinesisFunSuite extends SparkFunSuite {
if (shouldRunTests) {
test(testName)(testBody)
} else {
- ignore(s"$testName [enable by setting env var $envVarName=1]")(testBody)
+ ignore(s"$testName [enable by setting env var $envVarNameForEnablingTests=1]")(testBody)
}
}
@@ -40,7 +40,7 @@ trait KinesisFunSuite extends SparkFunSuite {
if (shouldRunTests) {
body
} else {
- ignore(s"$message [enable by setting env var $envVarName=1]")()
+ ignore(s"$message [enable by setting env var $envVarNameForEnablingTests=1]")()
}
}
}
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
index 98f2c7c4f1..ceb135e065 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
@@ -22,15 +22,14 @@ import scala.collection.JavaConversions.seqAsJavaList
import com.amazonaws.services.kinesis.clientlibrary.exceptions.{InvalidStateException, KinesisClientLibDependencyException, ShutdownException, ThrottlingException}
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
import com.amazonaws.services.kinesis.model.Record
+import org.mockito.Matchers._
import org.mockito.Mockito._
-import org.scalatest.{BeforeAndAfter, Matchers}
import org.scalatest.mock.MockitoSugar
+import org.scalatest.{BeforeAndAfter, Matchers}
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext, TestSuiteBase}
+import org.apache.spark.streaming.{Milliseconds, TestSuiteBase}
import org.apache.spark.util.{Clock, ManualClock, Utils}
/**
@@ -44,6 +43,8 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
val endpoint = "endpoint-url"
val workerId = "dummyWorkerId"
val shardId = "dummyShardId"
+ val seqNum = "dummySeqNum"
+ val someSeqNum = Some(seqNum)
val record1 = new Record()
record1.setData(ByteBuffer.wrap("Spark In Action".getBytes()))
@@ -80,16 +81,18 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
test("process records including store and checkpoint") {
when(receiverMock.isStopped()).thenReturn(false)
+ when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
when(checkpointStateMock.shouldCheckpoint()).thenReturn(true)
val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock)
+ recordProcessor.initialize(shardId)
recordProcessor.processRecords(batch, checkpointerMock)
verify(receiverMock, times(1)).isStopped()
- verify(receiverMock, times(1)).store(record1.getData().array())
- verify(receiverMock, times(1)).store(record2.getData().array())
+ verify(receiverMock, times(1)).addRecords(shardId, batch)
+ verify(receiverMock, times(1)).getLatestSeqNumToCheckpoint(shardId)
verify(checkpointStateMock, times(1)).shouldCheckpoint()
- verify(checkpointerMock, times(1)).checkpoint()
+ verify(checkpointerMock, times(1)).checkpoint(anyString)
verify(checkpointStateMock, times(1)).advanceCheckpoint()
}
@@ -100,19 +103,25 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
recordProcessor.processRecords(batch, checkpointerMock)
verify(receiverMock, times(1)).isStopped()
+ verify(receiverMock, never).addRecords(anyString, anyListOf(classOf[Record]))
+ verify(checkpointerMock, never).checkpoint(anyString)
}
test("shouldn't checkpoint when exception occurs during store") {
when(receiverMock.isStopped()).thenReturn(false)
- when(receiverMock.store(record1.getData().array())).thenThrow(new RuntimeException())
+ when(
+ receiverMock.addRecords(anyString, anyListOf(classOf[Record]))
+ ).thenThrow(new RuntimeException())
intercept[RuntimeException] {
val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock)
+ recordProcessor.initialize(shardId)
recordProcessor.processRecords(batch, checkpointerMock)
}
verify(receiverMock, times(1)).isStopped()
- verify(receiverMock, times(1)).store(record1.getData().array())
+ verify(receiverMock, times(1)).addRecords(shardId, batch)
+ verify(checkpointerMock, never).checkpoint(anyString)
}
test("should set checkpoint time to currentTime + checkpoint interval upon instantiation") {
@@ -158,19 +167,25 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
}
test("shutdown should checkpoint if the reason is TERMINATE") {
+ when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+
val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock)
- val reason = ShutdownReason.TERMINATE
- recordProcessor.shutdown(checkpointerMock, reason)
+ recordProcessor.initialize(shardId)
+ recordProcessor.shutdown(checkpointerMock, ShutdownReason.TERMINATE)
- verify(checkpointerMock, times(1)).checkpoint()
+ verify(receiverMock, times(1)).getLatestSeqNumToCheckpoint(shardId)
+ verify(checkpointerMock, times(1)).checkpoint(anyString)
}
test("shutdown should not checkpoint if the reason is something other than TERMINATE") {
+ when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+
val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock)
+ recordProcessor.initialize(shardId)
recordProcessor.shutdown(checkpointerMock, ShutdownReason.ZOMBIE)
recordProcessor.shutdown(checkpointerMock, null)
- verify(checkpointerMock, never()).checkpoint()
+ verify(checkpointerMock, never).checkpoint(anyString)
}
test("retry success on first attempt") {
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index b88c9c6478..1177dc7581 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -22,34 +22,67 @@ import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Random
+import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import org.scalatest.Matchers._
import org.scalatest.concurrent.Eventually
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
-import org.apache.spark.storage.StorageLevel
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.{StorageLevel, StreamBlockId}
import org.apache.spark.streaming._
-import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.streaming.kinesis.KinesisTestUtils._
+import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
+import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
+import org.apache.spark.util.Utils
+import org.apache.spark.{SparkConf, SparkContext}
class KinesisStreamSuite extends KinesisFunSuite
with Eventually with BeforeAndAfter with BeforeAndAfterAll {
- // This is the name that KCL uses to save metadata to DynamoDB
- private val kinesisAppName = s"KinesisStreamSuite-${math.abs(Random.nextLong())}"
+ // This is the name that KCL will use to save metadata to DynamoDB
+ private val appName = s"KinesisStreamSuite-${math.abs(Random.nextLong())}"
+ private val batchDuration = Seconds(1)
- private var ssc: StreamingContext = _
- private var sc: SparkContext = _
+ // Dummy parameters for API testing
+ private val dummyEndpointUrl = defaultEndpointUrl
+ private val dummyRegionName = RegionUtils.getRegionByEndpoint(dummyEndpointUrl).getName()
+ private val dummyAWSAccessKey = "dummyAccessKey"
+ private val dummyAWSSecretKey = "dummySecretKey"
+
+ private var testUtils: KinesisTestUtils = null
+ private var ssc: StreamingContext = null
+ private var sc: SparkContext = null
override def beforeAll(): Unit = {
val conf = new SparkConf()
.setMaster("local[4]")
.setAppName("KinesisStreamSuite") // Setting Spark app name to Kinesis app name
sc = new SparkContext(conf)
+
+ runIfTestsEnabled("Prepare KinesisTestUtils") {
+ testUtils = new KinesisTestUtils()
+ testUtils.createStream()
+ }
}
override def afterAll(): Unit = {
- sc.stop()
- // Delete the Kinesis stream as well as the DynamoDB table generated by
- // Kinesis Client Library when consuming the stream
+ if (ssc != null) {
+ ssc.stop()
+ }
+ if (sc != null) {
+ sc.stop()
+ }
+ if (testUtils != null) {
+ // Delete the Kinesis stream as well as the DynamoDB table generated by
+ // Kinesis Client Library when consuming the stream
+ testUtils.deleteStream()
+ testUtils.deleteDynamoDBTable(appName)
+ }
+ }
+
+ before {
+ ssc = new StreamingContext(sc, batchDuration)
}
after {
@@ -57,21 +90,75 @@ class KinesisStreamSuite extends KinesisFunSuite
ssc.stop(stopSparkContext = false)
ssc = null
}
+ if (testUtils != null) {
+ testUtils.deleteDynamoDBTable(appName)
+ }
}
test("KinesisUtils API") {
- ssc = new StreamingContext(sc, Seconds(1))
// Tests the API, does not actually test data receiving
val kinesisStream1 = KinesisUtils.createStream(ssc, "mySparkStream",
- "https://kinesis.us-west-2.amazonaws.com", Seconds(2),
+ dummyEndpointUrl, Seconds(2),
InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
- "https://kinesis.us-west-2.amazonaws.com", "us-west-2",
+ dummyEndpointUrl, dummyRegionName,
InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2)
val kinesisStream3 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
- "https://kinesis.us-west-2.amazonaws.com", "us-west-2",
+ dummyEndpointUrl, dummyRegionName,
InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2,
- "awsAccessKey", "awsSecretKey")
+ dummyAWSAccessKey, dummyAWSSecretKey)
+ }
+
+ test("RDD generation") {
+ val inputStream = KinesisUtils.createStream(ssc, appName, "dummyStream",
+ dummyEndpointUrl, dummyRegionName, InitialPositionInStream.LATEST, Seconds(2),
+ StorageLevel.MEMORY_AND_DISK_2, dummyAWSAccessKey, dummyAWSSecretKey)
+ assert(inputStream.isInstanceOf[KinesisInputDStream])
+
+ val kinesisStream = inputStream.asInstanceOf[KinesisInputDStream]
+ val time = Time(1000)
+
+ // Generate block info data for testing
+ val seqNumRanges1 = SequenceNumberRanges(
+ SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy"))
+ val blockId1 = StreamBlockId(kinesisStream.id, 123)
+ val blockInfo1 = ReceivedBlockInfo(
+ 0, None, Some(seqNumRanges1), new BlockManagerBasedStoreResult(blockId1, None))
+
+ val seqNumRanges2 = SequenceNumberRanges(
+ SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb"))
+ val blockId2 = StreamBlockId(kinesisStream.id, 345)
+ val blockInfo2 = ReceivedBlockInfo(
+ 0, None, Some(seqNumRanges2), new BlockManagerBasedStoreResult(blockId2, None))
+
+ // Verify that the generated KinesisBackedBlockRDD has the all the right information
+ val blockInfos = Seq(blockInfo1, blockInfo2)
+ val nonEmptyRDD = kinesisStream.createBlockRDD(time, blockInfos)
+ nonEmptyRDD shouldBe a [KinesisBackedBlockRDD]
+ val kinesisRDD = nonEmptyRDD.asInstanceOf[KinesisBackedBlockRDD]
+ assert(kinesisRDD.regionName === dummyRegionName)
+ assert(kinesisRDD.endpointUrl === dummyEndpointUrl)
+ assert(kinesisRDD.retryTimeoutMs === batchDuration.milliseconds)
+ assert(kinesisRDD.awsCredentialsOption ===
+ Some(SerializableAWSCredentials(dummyAWSAccessKey, dummyAWSSecretKey)))
+ assert(nonEmptyRDD.partitions.size === blockInfos.size)
+ nonEmptyRDD.partitions.foreach { _ shouldBe a [KinesisBackedBlockRDDPartition] }
+ val partitions = nonEmptyRDD.partitions.map {
+ _.asInstanceOf[KinesisBackedBlockRDDPartition] }.toSeq
+ assert(partitions.map { _.seqNumberRanges } === Seq(seqNumRanges1, seqNumRanges2))
+ assert(partitions.map { _.blockId } === Seq(blockId1, blockId2))
+ assert(partitions.forall { _.isBlockIdValid === true })
+
+ // Verify that KinesisBackedBlockRDD is generated even when there are no blocks
+ val emptyRDD = kinesisStream.createBlockRDD(time, Seq.empty)
+ emptyRDD shouldBe a [KinesisBackedBlockRDD]
+ emptyRDD.partitions shouldBe empty
+
+ // Verify that the KinesisBackedBlockRDD has isBlockValid = false when blocks are invalid
+ blockInfos.foreach { _.setBlockIdInvalid() }
+ kinesisStream.createBlockRDD(time, blockInfos).partitions.foreach { partition =>
+ assert(partition.asInstanceOf[KinesisBackedBlockRDDPartition].isBlockIdValid === false)
+ }
}
@@ -84,32 +171,91 @@ class KinesisStreamSuite extends KinesisFunSuite
* and you have to set the system environment variable RUN_KINESIS_TESTS=1 .
*/
testIfEnabled("basic operation") {
- val kinesisTestUtils = new KinesisTestUtils()
- try {
- kinesisTestUtils.createStream()
- ssc = new StreamingContext(sc, Seconds(1))
- val awsCredentials = KinesisTestUtils.getAWSCredentials()
- val stream = KinesisUtils.createStream(ssc, kinesisAppName, kinesisTestUtils.streamName,
- kinesisTestUtils.endpointUrl, kinesisTestUtils.regionName, InitialPositionInStream.LATEST,
- Seconds(10), StorageLevel.MEMORY_ONLY,
- awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
-
- val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
- stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
- collected ++= rdd.collect()
- logInfo("Collected = " + rdd.collect().toSeq.mkString(", "))
- }
- ssc.start()
+ val awsCredentials = KinesisTestUtils.getAWSCredentials()
+ val stream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
+ testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
+ Seconds(10), StorageLevel.MEMORY_ONLY,
+ awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
- val testData = 1 to 10
- eventually(timeout(120 seconds), interval(10 second)) {
- kinesisTestUtils.pushData(testData)
- assert(collected === testData.toSet, "\nData received does not match data sent")
+ val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
+ stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
+ collected ++= rdd.collect()
+ logInfo("Collected = " + rdd.collect().toSeq.mkString(", "))
+ }
+ ssc.start()
+
+ val testData = 1 to 10
+ eventually(timeout(120 seconds), interval(10 second)) {
+ testUtils.pushData(testData)
+ assert(collected === testData.toSet, "\nData received does not match data sent")
+ }
+ ssc.stop(stopSparkContext = false)
+ }
+
+ testIfEnabled("failure recovery") {
+ val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
+ val checkpointDir = Utils.createTempDir().getAbsolutePath
+
+ ssc = new StreamingContext(sc, Milliseconds(1000))
+ ssc.checkpoint(checkpointDir)
+
+ val awsCredentials = KinesisTestUtils.getAWSCredentials()
+ val collectedData = new mutable.HashMap[Time, (Array[SequenceNumberRanges], Seq[Int])]
+ with mutable.SynchronizedMap[Time, (Array[SequenceNumberRanges], Seq[Int])]
+
+ val kinesisStream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
+ testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
+ Seconds(10), StorageLevel.MEMORY_ONLY,
+ awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
+
+ // Verify that the generated RDDs are KinesisBackedBlockRDDs, and collect the data in each batch
+ kinesisStream.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => {
+ val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD]
+ val data = rdd.map { bytes => new String(bytes).toInt }.collect().toSeq
+ collectedData(time) = (kRdd.arrayOfseqNumberRanges, data)
+ })
+
+ ssc.remember(Minutes(60)) // remember all the batches so that they are all saved in checkpoint
+ ssc.start()
+
+ def numBatchesWithData: Int = collectedData.count(_._2._2.nonEmpty)
+
+ def isCheckpointPresent: Boolean = Checkpoint.getCheckpointFiles(checkpointDir).nonEmpty
+
+ // Run until there are at least 10 batches with some data in them
+ // If this times out because numBatchesWithData is empty, then its likely that foreachRDD
+ // function failed with exceptions, and nothing got added to `collectedData`
+ eventually(timeout(2 minutes), interval(1 seconds)) {
+ testUtils.pushData(1 to 5)
+ assert(isCheckpointPresent && numBatchesWithData > 10)
+ }
+ ssc.stop(stopSparkContext = true) // stop the SparkContext so that the blocks are not reused
+
+ // Restart the context from checkpoint and verify whether the
+ logInfo("Restarting from checkpoint")
+ ssc = new StreamingContext(checkpointDir)
+ ssc.start()
+ val recoveredKinesisStream = ssc.graph.getInputStreams().head
+
+ // Verify that the recomputed RDDs are KinesisBackedBlockRDDs with the same sequence ranges
+ // and return the same data
+ val times = collectedData.keySet
+ times.foreach { time =>
+ val (arrayOfSeqNumRanges, data) = collectedData(time)
+ val rdd = recoveredKinesisStream.getOrCompute(time).get.asInstanceOf[RDD[Array[Byte]]]
+ rdd shouldBe a [KinesisBackedBlockRDD]
+
+ // Verify the recovered sequence ranges
+ val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD]
+ assert(kRdd.arrayOfseqNumberRanges.size === arrayOfSeqNumRanges.size)
+ arrayOfSeqNumRanges.zip(kRdd.arrayOfseqNumberRanges).foreach { case (expected, found) =>
+ assert(expected.ranges.toSeq === found.ranges.toSeq)
}
- ssc.stop()
- } finally {
- kinesisTestUtils.deleteStream()
- kinesisTestUtils.deleteDynamoDBTable(kinesisAppName)
+
+ // Verify the recovered data
+ assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSeq === data)
}
+ ssc.stop()
}
+
}