aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-08-05 00:20:26 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-08-05 00:20:26 -0700
commitc2a71f0714b7a6ab30c1c4998f606f782428971c (patch)
tree2d5c2a126b648e0d51c1039fce83de5f17c9b660 /streaming/src
parent781c8d71a0a6a86c84048a4f22cb3a7d035a5be2 (diff)
downloadspark-c2a71f0714b7a6ab30c1c4998f606f782428971c.tar.gz
spark-c2a71f0714b7a6ab30c1c4998f606f782428971c.tar.bz2
spark-c2a71f0714b7a6ab30c1c4998f606f782428971c.zip
[SPARK-9217] [STREAMING] Make the kinesis receiver reliable by recording sequence numbers
This PR is the second one in the larger issue of making the Kinesis integration reliable and provide WAL-free at-least once guarantee. It is based on the design doc - https://docs.google.com/document/d/1k0dl270EnK7uExrsCE7jYw7PYx0YC935uBcxn3p0f58/edit In this PR, I have updated the Kinesis Receiver to do the following. - Control the block generation, by creating its own BlockGenerator with own callback methods and using it to keep track of the ranges of sequence numbers that go into each block. - More specifically, as the KinesisRecordProcessor provides small batches of records, the records are atomically inserted into the block (that is, either the whole batch is in the block, or not). Accordingly the sequence number range of the batch is recorded. Since there may be many batches added to a block, the receiver tracks all the range of sequence numbers that is added to a block. - When the block is ready to be pushed, the block is pushed and the ranges are reported as metadata of the block. In addition, the ranges are used to find out the latest sequence number for each shard that can be checkpointed through the DynamoDB. - Periodically, each KinesisRecordProcessor checkpoints the latest successfully stored sequence number for it own shard. - The array of ranges in the block metadata is used to create KinesisBackedBlockRDDs. The ReceiverInputDStream has been slightly refactored to allow the creation of KinesisBackedBlockRDDs instead of the WALBackedBlockRDDs. Things to be done - [x] Add new test to verify that the sequence numbers are recovered. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #7825 from tdas/kinesis-receiver and squashes the following commits: 2159be9 [Tathagata Das] Fixed bug 569be83 [Tathagata Das] Fix scala style issue bf31e22 [Tathagata Das] Added more documentation to make the kinesis test endpoint more configurable 3ad8361 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into kinesis-receiver c693a63 [Tathagata Das] Removed unnecessary constructor params from KinesisTestUtils e1f1d0a [Tathagata Das] Addressed PR comments b9fa6bf [Tathagata Das] Fix serialization issues f8b7680 [Tathagata Das] Updated doc 33fe43a [Tathagata Das] Added more tests 7997138 [Tathagata Das] Fix style errors a806710 [Tathagata Das] Fixed unit test and use KinesisInputDStream 40a1709 [Tathagata Das] Fixed KinesisReceiverSuite tests 7e44df6 [Tathagata Das] Added documentation and fixed checkpointing 096383f [Tathagata Das] Added test, and addressed some of the comments. 84a7892 [Tathagata Das] fixed scala style issue e19e37d [Tathagata Das] Added license 1cd7b66 [Tathagata Das] Updated kinesis receiver
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala71
1 files changed, 39 insertions, 32 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index 670ef8d296..a15800917c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -21,12 +21,12 @@ import scala.reflect.ClassTag
import org.apache.spark.rdd.{BlockRDD, RDD}
import org.apache.spark.storage.BlockId
-import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
import org.apache.spark.streaming.scheduler.rate.RateEstimator
+import org.apache.spark.streaming.scheduler.{ReceivedBlockInfo, RateController, StreamInputInfo}
import org.apache.spark.streaming.util.WriteAheadLogUtils
+import org.apache.spark.streaming.{StreamingContext, Time}
/**
* Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
@@ -79,48 +79,55 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
// for this batch
val receiverTracker = ssc.scheduler.receiverTracker
val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
- val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
// Register the input blocks information into InputInfoTracker
val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
- if (blockInfos.nonEmpty) {
- // Are WAL record handles present with all the blocks
- val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
+ // Create the BlockRDD
+ createBlockRDD(validTime, blockInfos)
+ }
+ }
+ Some(blockRDD)
+ }
- if (areWALRecordHandlesPresent) {
- // If all the blocks have WAL record handle, then create a WALBackedBlockRDD
- val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
- val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
- new WriteAheadLogBackedBlockRDD[T](
- ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
- } else {
- // Else, create a BlockRDD. However, if there are some blocks with WAL info but not
- // others then that is unexpected and log a warning accordingly.
- if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
- if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
- logError("Some blocks do not have Write Ahead Log information; " +
- "this is unexpected and data may not be recoverable after driver failures")
- } else {
- logWarning("Some blocks have Write Ahead Log information; this is unexpected")
- }
- }
- new BlockRDD[T](ssc.sc, blockIds)
- }
- } else {
- // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD
- // according to the configuration
+ private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = {
+
+ if (blockInfos.nonEmpty) {
+ val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
+
+ // Are WAL record handles present with all the blocks
+ val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
+
+ if (areWALRecordHandlesPresent) {
+ // If all the blocks have WAL record handle, then create a WALBackedBlockRDD
+ val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
+ val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
+ new WriteAheadLogBackedBlockRDD[T](
+ ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
+ } else {
+ // Else, create a BlockRDD. However, if there are some blocks with WAL info but not
+ // others then that is unexpected and log a warning accordingly.
+ if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
- new WriteAheadLogBackedBlockRDD[T](
- ssc.sparkContext, Array.empty, Array.empty, Array.empty)
+ logError("Some blocks do not have Write Ahead Log information; " +
+ "this is unexpected and data may not be recoverable after driver failures")
} else {
- new BlockRDD[T](ssc.sc, Array.empty)
+ logWarning("Some blocks have Write Ahead Log information; this is unexpected")
}
}
+ new BlockRDD[T](ssc.sc, blockIds)
+ }
+ } else {
+ // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD
+ // according to the configuration
+ if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
+ new WriteAheadLogBackedBlockRDD[T](
+ ssc.sparkContext, Array.empty, Array.empty, Array.empty)
+ } else {
+ new BlockRDD[T](ssc.sc, Array.empty)
}
}
- Some(blockRDD)
}
/**