diff options
author | Burak Yavuz <brkyvz@gmail.com> | 2015-11-09 17:18:49 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-11-09 17:18:49 -0800 |
commit | 26062d22607e1f9854bc2588ba22a4e0f8bba48c (patch) | |
tree | c9a3c0e2a0bf59a481af2637e700500799569de0 /extras/kinesis-asl | |
parent | 61f9c8711c79f35d67b0456155866da316b131d9 (diff) | |
download | spark-26062d22607e1f9854bc2588ba22a4e0f8bba48c.tar.gz spark-26062d22607e1f9854bc2588ba22a4e0f8bba48c.tar.bz2 spark-26062d22607e1f9854bc2588ba22a4e0f8bba48c.zip |
[SPARK-11198][STREAMING][KINESIS] Support de-aggregation of records during recovery
While the KCL handles de-aggregation during the regular operation, during recovery we use the lower level api, and therefore need to de-aggregate the records.
tdas Testing is an issue, we need protobuf magic to do the aggregated records. Maybe we could depend on KPL for tests?
Author: Burak Yavuz <brkyvz@gmail.com>
Closes #9403 from brkyvz/kinesis-deaggregation.
Diffstat (limited to 'extras/kinesis-asl')
-rw-r--r-- | extras/kinesis-asl/pom.xml | 6 | ||||
-rw-r--r-- | extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala | 6 | ||||
-rw-r--r-- | extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala | 1 | ||||
-rw-r--r-- | extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala | 2 | ||||
-rw-r--r-- | extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala | 12 | ||||
-rw-r--r-- | extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala | 17 | ||||
-rw-r--r-- | extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala (renamed from extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala) | 55 |
7 files changed, 74 insertions, 25 deletions
diff --git a/extras/kinesis-asl/pom.xml b/extras/kinesis-asl/pom.xml index ef72d97eae..519a920279 100644 --- a/extras/kinesis-asl/pom.xml +++ b/extras/kinesis-asl/pom.xml @@ -65,6 +65,12 @@ <version>${aws.java.sdk.version}</version> </dependency> <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>amazon-kinesis-producer</artifactId> + <version>${aws.kinesis.producer.version}</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-core</artifactId> <scope>test</scope> diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala index 000897a4e7..691c1790b2 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala @@ -23,6 +23,7 @@ import scala.util.control.NonFatal import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain} import com.amazonaws.services.kinesis.AmazonKinesisClient +import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord import com.amazonaws.services.kinesis.model._ import org.apache.spark._ @@ -210,7 +211,10 @@ class KinesisSequenceRangeIterator( s"getting records using shard iterator") { client.getRecords(getRecordsRequest) } - (getRecordsResult.getRecords.iterator().asScala, getRecordsResult.getNextShardIterator) + // De-aggregate records, if KPL was used in producing the records. The KCL automatically + // handles de-aggregation during regular operation. This code path is used during recovery + val recordIterator = UserRecord.deaggregate(getRecordsResult.getRecords) + (recordIterator.iterator().asScala, getRecordsResult.getNextShardIterator) } /** diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index 50993f157c..97dbb91857 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -216,7 +216,6 @@ private[kinesis] class KinesisReceiver[T]( val metadata = SequenceNumberRange(streamName, shardId, records.get(0).getSequenceNumber(), records.get(records.size() - 1).getSequenceNumber()) blockGenerator.addMultipleDataWithCallback(dataIterator, metadata) - } } diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala index e381ffa0cb..b5b76cb92d 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala @@ -80,7 +80,7 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w * more than once. */ logError(s"Exception: WorkerId $workerId encountered and exception while storing " + - " or checkpointing a batch for workerId $workerId and shardId $shardId.", e) + s" or checkpointing a batch for workerId $workerId and shardId $shardId.", e) /* Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor. */ throw e diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala index 9f9e146a08..52c61dfb1c 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala @@ -22,7 +22,8 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId} import org.apache.spark.{SparkConf, SparkContext, SparkException} -class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll { +abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean) + extends KinesisFunSuite with BeforeAndAfterAll { private val testData = 1 to 8 @@ -37,13 +38,12 @@ class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll private var sc: SparkContext = null private var blockManager: BlockManager = null - override def beforeAll(): Unit = { runIfTestsEnabled("Prepare KinesisTestUtils") { testUtils = new KinesisTestUtils() testUtils.createStream() - shardIdToDataAndSeqNumbers = testUtils.pushData(testData) + shardIdToDataAndSeqNumbers = testUtils.pushData(testData, aggregate = aggregateTestData) require(shardIdToDataAndSeqNumbers.size > 1, "Need data to be sent to multiple shards") shardIds = shardIdToDataAndSeqNumbers.keySet.toSeq @@ -247,3 +247,9 @@ class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll Array.tabulate(num) { i => new StreamBlockId(0, i) } } } + +class WithAggregationKinesisBackedBlockRDDSuite + extends KinesisBackedBlockRDDTests(aggregateTestData = true) + +class WithoutAggregationKinesisBackedBlockRDDSuite + extends KinesisBackedBlockRDDTests(aggregateTestData = false) diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index ba84e557df..dee30444d8 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -39,7 +39,7 @@ import org.apache.spark.streaming.scheduler.ReceivedBlockInfo import org.apache.spark.util.Utils import org.apache.spark.{SparkConf, SparkContext} -class KinesisStreamSuite extends KinesisFunSuite +abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFunSuite with Eventually with BeforeAndAfter with BeforeAndAfterAll { // This is the name that KCL will use to save metadata to DynamoDB @@ -182,13 +182,13 @@ class KinesisStreamSuite extends KinesisFunSuite val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int] stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd => collected ++= rdd.collect() - logInfo("Collected = " + rdd.collect().toSeq.mkString(", ")) + logInfo("Collected = " + collected.mkString(", ")) } ssc.start() val testData = 1 to 10 eventually(timeout(120 seconds), interval(10 second)) { - testUtils.pushData(testData) + testUtils.pushData(testData, aggregateTestData) assert(collected === testData.toSet, "\nData received does not match data sent") } ssc.stop(stopSparkContext = false) @@ -207,13 +207,13 @@ class KinesisStreamSuite extends KinesisFunSuite val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int] stream.foreachRDD { rdd => collected ++= rdd.collect() - logInfo("Collected = " + rdd.collect().toSeq.mkString(", ")) + logInfo("Collected = " + collected.mkString(", ")) } ssc.start() val testData = 1 to 10 eventually(timeout(120 seconds), interval(10 second)) { - testUtils.pushData(testData) + testUtils.pushData(testData, aggregateTestData) val modData = testData.map(_ + 5) assert(collected === modData.toSet, "\nData received does not match data sent") } @@ -254,7 +254,7 @@ class KinesisStreamSuite extends KinesisFunSuite // If this times out because numBatchesWithData is empty, then its likely that foreachRDD // function failed with exceptions, and nothing got added to `collectedData` eventually(timeout(2 minutes), interval(1 seconds)) { - testUtils.pushData(1 to 5) + testUtils.pushData(1 to 5, aggregateTestData) assert(isCheckpointPresent && numBatchesWithData > 10) } ssc.stop(stopSparkContext = true) // stop the SparkContext so that the blocks are not reused @@ -285,5 +285,8 @@ class KinesisStreamSuite extends KinesisFunSuite } ssc.stop() } - } + +class WithAggregationKinesisStreamSuite extends KinesisStreamTests(aggregateTestData = true) + +class WithoutAggregationKinesisStreamSuite extends KinesisStreamTests(aggregateTestData = false) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala index 634bf94521..7487aa1c12 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala @@ -31,6 +31,8 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient import com.amazonaws.services.dynamodbv2.document.DynamoDB import com.amazonaws.services.kinesis.AmazonKinesisClient import com.amazonaws.services.kinesis.model._ +import com.amazonaws.services.kinesis.producer.{KinesisProducer, KinesisProducerConfiguration, UserRecordResult} +import com.google.common.util.concurrent.{FutureCallback, Futures} import org.apache.spark.Logging @@ -64,6 +66,16 @@ private[kinesis] class KinesisTestUtils extends Logging { new DynamoDB(dynamoDBClient) } + private lazy val kinesisProducer: KinesisProducer = { + val conf = new KinesisProducerConfiguration() + .setRecordMaxBufferedTime(1000) + .setMaxConnections(1) + .setRegion(regionName) + .setMetricsLevel("none") + + new KinesisProducer(conf) + } + def streamName: String = { require(streamCreated, "Stream not yet created, call createStream() to create one") _streamName @@ -90,22 +102,41 @@ private[kinesis] class KinesisTestUtils extends Logging { * Push data to Kinesis stream and return a map of * shardId -> seq of (data, seq number) pushed to corresponding shard */ - def pushData(testData: Seq[Int]): Map[String, Seq[(Int, String)]] = { + def pushData(testData: Seq[Int], aggregate: Boolean): Map[String, Seq[(Int, String)]] = { require(streamCreated, "Stream not yet created, call createStream() to create one") val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]() testData.foreach { num => val str = num.toString - val putRecordRequest = new PutRecordRequest().withStreamName(streamName) - .withData(ByteBuffer.wrap(str.getBytes())) - .withPartitionKey(str) - - val putRecordResult = kinesisClient.putRecord(putRecordRequest) - val shardId = putRecordResult.getShardId - val seqNumber = putRecordResult.getSequenceNumber() - val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId, - new ArrayBuffer[(Int, String)]()) - sentSeqNumbers += ((num, seqNumber)) + val data = ByteBuffer.wrap(str.getBytes()) + if (aggregate) { + val future = kinesisProducer.addUserRecord(streamName, str, data) + val kinesisCallBack = new FutureCallback[UserRecordResult]() { + override def onFailure(t: Throwable): Unit = {} // do nothing + + override def onSuccess(result: UserRecordResult): Unit = { + val shardId = result.getShardId + val seqNumber = result.getSequenceNumber() + val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId, + new ArrayBuffer[(Int, String)]()) + sentSeqNumbers += ((num, seqNumber)) + } + } + + Futures.addCallback(future, kinesisCallBack) + kinesisProducer.flushSync() // make sure we send all data before returning the map + } else { + val putRecordRequest = new PutRecordRequest().withStreamName(streamName) + .withData(data) + .withPartitionKey(str) + + val putRecordResult = kinesisClient.putRecord(putRecordRequest) + val shardId = putRecordResult.getShardId + val seqNumber = putRecordResult.getSequenceNumber() + val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId, + new ArrayBuffer[(Int, String)]()) + sentSeqNumbers += ((num, seqNumber)) + } } logInfo(s"Pushed $testData:\n\t ${shardIdToSeqNumbers.mkString("\n\t")}") @@ -116,7 +147,7 @@ private[kinesis] class KinesisTestUtils extends Logging { * Expose a Python friendly API. */ def pushData(testData: java.util.List[Int]): Unit = { - pushData(testData.asScala) + pushData(testData.asScala, aggregate = false) } def deleteStream(): Unit = { |