aboutsummaryrefslogtreecommitdiff
path: root/extras
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2015-11-09 17:18:49 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-11-09 17:18:49 -0800
commit26062d22607e1f9854bc2588ba22a4e0f8bba48c (patch)
treec9a3c0e2a0bf59a481af2637e700500799569de0 /extras
parent61f9c8711c79f35d67b0456155866da316b131d9 (diff)
downloadspark-26062d22607e1f9854bc2588ba22a4e0f8bba48c.tar.gz
spark-26062d22607e1f9854bc2588ba22a4e0f8bba48c.tar.bz2
spark-26062d22607e1f9854bc2588ba22a4e0f8bba48c.zip
[SPARK-11198][STREAMING][KINESIS] Support de-aggregation of records during recovery
While the KCL handles de-aggregation during the regular operation, during recovery we use the lower level api, and therefore need to de-aggregate the records. tdas Testing is an issue, we need protobuf magic to do the aggregated records. Maybe we could depend on KPL for tests? Author: Burak Yavuz <brkyvz@gmail.com> Closes #9403 from brkyvz/kinesis-deaggregation.
Diffstat (limited to 'extras')
-rw-r--r--extras/kinesis-asl/pom.xml6
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala6
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala1
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala2
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala12
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala17
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala (renamed from extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala)55
7 files changed, 74 insertions, 25 deletions
diff --git a/extras/kinesis-asl/pom.xml b/extras/kinesis-asl/pom.xml
index ef72d97eae..519a920279 100644
--- a/extras/kinesis-asl/pom.xml
+++ b/extras/kinesis-asl/pom.xml
@@ -65,6 +65,12 @@
<version>${aws.java.sdk.version}</version>
</dependency>
<dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>amazon-kinesis-producer</artifactId>
+ <version>${aws.kinesis.producer.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
index 000897a4e7..691c1790b2 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
@@ -23,6 +23,7 @@ import scala.util.control.NonFatal
import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain}
import com.amazonaws.services.kinesis.AmazonKinesisClient
+import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord
import com.amazonaws.services.kinesis.model._
import org.apache.spark._
@@ -210,7 +211,10 @@ class KinesisSequenceRangeIterator(
s"getting records using shard iterator") {
client.getRecords(getRecordsRequest)
}
- (getRecordsResult.getRecords.iterator().asScala, getRecordsResult.getNextShardIterator)
+ // De-aggregate records, if KPL was used in producing the records. The KCL automatically
+ // handles de-aggregation during regular operation. This code path is used during recovery
+ val recordIterator = UserRecord.deaggregate(getRecordsResult.getRecords)
+ (recordIterator.iterator().asScala, getRecordsResult.getNextShardIterator)
}
/**
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
index 50993f157c..97dbb91857 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -216,7 +216,6 @@ private[kinesis] class KinesisReceiver[T](
val metadata = SequenceNumberRange(streamName, shardId,
records.get(0).getSequenceNumber(), records.get(records.size() - 1).getSequenceNumber())
blockGenerator.addMultipleDataWithCallback(dataIterator, metadata)
-
}
}
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
index e381ffa0cb..b5b76cb92d 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
@@ -80,7 +80,7 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w
* more than once.
*/
logError(s"Exception: WorkerId $workerId encountered and exception while storing " +
- " or checkpointing a batch for workerId $workerId and shardId $shardId.", e)
+ s" or checkpointing a batch for workerId $workerId and shardId $shardId.", e)
/* Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor. */
throw e
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
index 9f9e146a08..52c61dfb1c 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
@@ -22,7 +22,8 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId}
import org.apache.spark.{SparkConf, SparkContext, SparkException}
-class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll {
+abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
+ extends KinesisFunSuite with BeforeAndAfterAll {
private val testData = 1 to 8
@@ -37,13 +38,12 @@ class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll
private var sc: SparkContext = null
private var blockManager: BlockManager = null
-
override def beforeAll(): Unit = {
runIfTestsEnabled("Prepare KinesisTestUtils") {
testUtils = new KinesisTestUtils()
testUtils.createStream()
- shardIdToDataAndSeqNumbers = testUtils.pushData(testData)
+ shardIdToDataAndSeqNumbers = testUtils.pushData(testData, aggregate = aggregateTestData)
require(shardIdToDataAndSeqNumbers.size > 1, "Need data to be sent to multiple shards")
shardIds = shardIdToDataAndSeqNumbers.keySet.toSeq
@@ -247,3 +247,9 @@ class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll
Array.tabulate(num) { i => new StreamBlockId(0, i) }
}
}
+
+class WithAggregationKinesisBackedBlockRDDSuite
+ extends KinesisBackedBlockRDDTests(aggregateTestData = true)
+
+class WithoutAggregationKinesisBackedBlockRDDSuite
+ extends KinesisBackedBlockRDDTests(aggregateTestData = false)
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index ba84e557df..dee30444d8 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -39,7 +39,7 @@ import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
import org.apache.spark.util.Utils
import org.apache.spark.{SparkConf, SparkContext}
-class KinesisStreamSuite extends KinesisFunSuite
+abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFunSuite
with Eventually with BeforeAndAfter with BeforeAndAfterAll {
// This is the name that KCL will use to save metadata to DynamoDB
@@ -182,13 +182,13 @@ class KinesisStreamSuite extends KinesisFunSuite
val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
collected ++= rdd.collect()
- logInfo("Collected = " + rdd.collect().toSeq.mkString(", "))
+ logInfo("Collected = " + collected.mkString(", "))
}
ssc.start()
val testData = 1 to 10
eventually(timeout(120 seconds), interval(10 second)) {
- testUtils.pushData(testData)
+ testUtils.pushData(testData, aggregateTestData)
assert(collected === testData.toSet, "\nData received does not match data sent")
}
ssc.stop(stopSparkContext = false)
@@ -207,13 +207,13 @@ class KinesisStreamSuite extends KinesisFunSuite
val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
stream.foreachRDD { rdd =>
collected ++= rdd.collect()
- logInfo("Collected = " + rdd.collect().toSeq.mkString(", "))
+ logInfo("Collected = " + collected.mkString(", "))
}
ssc.start()
val testData = 1 to 10
eventually(timeout(120 seconds), interval(10 second)) {
- testUtils.pushData(testData)
+ testUtils.pushData(testData, aggregateTestData)
val modData = testData.map(_ + 5)
assert(collected === modData.toSet, "\nData received does not match data sent")
}
@@ -254,7 +254,7 @@ class KinesisStreamSuite extends KinesisFunSuite
// If this times out because numBatchesWithData is empty, then its likely that foreachRDD
// function failed with exceptions, and nothing got added to `collectedData`
eventually(timeout(2 minutes), interval(1 seconds)) {
- testUtils.pushData(1 to 5)
+ testUtils.pushData(1 to 5, aggregateTestData)
assert(isCheckpointPresent && numBatchesWithData > 10)
}
ssc.stop(stopSparkContext = true) // stop the SparkContext so that the blocks are not reused
@@ -285,5 +285,8 @@ class KinesisStreamSuite extends KinesisFunSuite
}
ssc.stop()
}
-
}
+
+class WithAggregationKinesisStreamSuite extends KinesisStreamTests(aggregateTestData = true)
+
+class WithoutAggregationKinesisStreamSuite extends KinesisStreamTests(aggregateTestData = false)
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
index 634bf94521..7487aa1c12 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -31,6 +31,8 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.dynamodbv2.document.DynamoDB
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.model._
+import com.amazonaws.services.kinesis.producer.{KinesisProducer, KinesisProducerConfiguration, UserRecordResult}
+import com.google.common.util.concurrent.{FutureCallback, Futures}
import org.apache.spark.Logging
@@ -64,6 +66,16 @@ private[kinesis] class KinesisTestUtils extends Logging {
new DynamoDB(dynamoDBClient)
}
+ private lazy val kinesisProducer: KinesisProducer = {
+ val conf = new KinesisProducerConfiguration()
+ .setRecordMaxBufferedTime(1000)
+ .setMaxConnections(1)
+ .setRegion(regionName)
+ .setMetricsLevel("none")
+
+ new KinesisProducer(conf)
+ }
+
def streamName: String = {
require(streamCreated, "Stream not yet created, call createStream() to create one")
_streamName
@@ -90,22 +102,41 @@ private[kinesis] class KinesisTestUtils extends Logging {
* Push data to Kinesis stream and return a map of
* shardId -> seq of (data, seq number) pushed to corresponding shard
*/
- def pushData(testData: Seq[Int]): Map[String, Seq[(Int, String)]] = {
+ def pushData(testData: Seq[Int], aggregate: Boolean): Map[String, Seq[(Int, String)]] = {
require(streamCreated, "Stream not yet created, call createStream() to create one")
val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]()
testData.foreach { num =>
val str = num.toString
- val putRecordRequest = new PutRecordRequest().withStreamName(streamName)
- .withData(ByteBuffer.wrap(str.getBytes()))
- .withPartitionKey(str)
-
- val putRecordResult = kinesisClient.putRecord(putRecordRequest)
- val shardId = putRecordResult.getShardId
- val seqNumber = putRecordResult.getSequenceNumber()
- val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
- new ArrayBuffer[(Int, String)]())
- sentSeqNumbers += ((num, seqNumber))
+ val data = ByteBuffer.wrap(str.getBytes())
+ if (aggregate) {
+ val future = kinesisProducer.addUserRecord(streamName, str, data)
+ val kinesisCallBack = new FutureCallback[UserRecordResult]() {
+ override def onFailure(t: Throwable): Unit = {} // do nothing
+
+ override def onSuccess(result: UserRecordResult): Unit = {
+ val shardId = result.getShardId
+ val seqNumber = result.getSequenceNumber()
+ val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
+ new ArrayBuffer[(Int, String)]())
+ sentSeqNumbers += ((num, seqNumber))
+ }
+ }
+
+ Futures.addCallback(future, kinesisCallBack)
+ kinesisProducer.flushSync() // make sure we send all data before returning the map
+ } else {
+ val putRecordRequest = new PutRecordRequest().withStreamName(streamName)
+ .withData(data)
+ .withPartitionKey(str)
+
+ val putRecordResult = kinesisClient.putRecord(putRecordRequest)
+ val shardId = putRecordResult.getShardId
+ val seqNumber = putRecordResult.getSequenceNumber()
+ val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
+ new ArrayBuffer[(Int, String)]())
+ sentSeqNumbers += ((num, seqNumber))
+ }
}
logInfo(s"Pushed $testData:\n\t ${shardIdToSeqNumbers.mkString("\n\t")}")
@@ -116,7 +147,7 @@ private[kinesis] class KinesisTestUtils extends Logging {
* Expose a Python friendly API.
*/
def pushData(testData: java.util.List[Int]): Unit = {
- pushData(testData.asScala)
+ pushData(testData.asScala, aggregate = false)
}
def deleteStream(): Unit = {