aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorcody koeninger <cody@koeninger.org>2016-10-27 10:30:59 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-10-27 10:30:59 -0700
commit104232580528c097a284d753adb5795f6de8b0a5 (patch)
tree3c7147652132b67b0f9bf4e9fe83f602c6c35127 /external
parent701a9d361b3045a25c42b3c0e44e7755d45ff78c (diff)
downloadspark-104232580528c097a284d753adb5795f6de8b0a5.tar.gz
spark-104232580528c097a284d753adb5795f6de8b0a5.tar.bz2
spark-104232580528c097a284d753adb5795f6de8b0a5.zip
[SPARK-17813][SQL][KAFKA] Maximum data per trigger
## What changes were proposed in this pull request? maxOffsetsPerTrigger option for rate limiting, proportionally based on volume of different topicpartitions. ## How was this patch tested? Added unit test Author: cody koeninger <cody@koeninger.org> Closes #15527 from koeninger/SPARK-17813.
Diffstat (limited to 'external')
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala107
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala71
2 files changed, 151 insertions, 27 deletions
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 537b7b0baa..61cba737d1 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -96,6 +96,9 @@ private[kafka010] case class KafkaSource(
private val offsetFetchAttemptIntervalMs =
sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "10").toLong
+ private val maxOffsetsPerTrigger =
+ sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)
+
/**
* A KafkaConsumer used in the driver to query the latest Kafka offsets. This only queries the
* offsets and never commits them.
@@ -121,6 +124,8 @@ private[kafka010] case class KafkaSource(
}.partitionToOffsets
}
+ private var currentPartitionOffsets: Option[Map[TopicPartition, Long]] = None
+
override def schema: StructType = KafkaSource.kafkaSchema
/** Returns the maximum available offset for this source. */
@@ -128,9 +133,54 @@ private[kafka010] case class KafkaSource(
// Make sure initialPartitionOffsets is initialized
initialPartitionOffsets
- val offset = KafkaSourceOffset(fetchLatestOffsets())
- logDebug(s"GetOffset: ${offset.partitionToOffsets.toSeq.map(_.toString).sorted}")
- Some(offset)
+ val latest = fetchLatestOffsets()
+ val offsets = maxOffsetsPerTrigger match {
+ case None =>
+ latest
+ case Some(limit) if currentPartitionOffsets.isEmpty =>
+ rateLimit(limit, initialPartitionOffsets, latest)
+ case Some(limit) =>
+ rateLimit(limit, currentPartitionOffsets.get, latest)
+ }
+
+ currentPartitionOffsets = Some(offsets)
+ logDebug(s"GetOffset: ${offsets.toSeq.map(_.toString).sorted}")
+ Some(KafkaSourceOffset(offsets))
+ }
+
+ /** Proportionally distribute limit number of offsets among topicpartitions */
+ private def rateLimit(
+ limit: Long,
+ from: Map[TopicPartition, Long],
+ until: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
+ val fromNew = fetchNewPartitionEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
+ val sizes = until.flatMap {
+ case (tp, end) =>
+ // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
+ from.get(tp).orElse(fromNew.get(tp)).flatMap { begin =>
+ val size = end - begin
+ logDebug(s"rateLimit $tp size is $size")
+ if (size > 0) Some(tp -> size) else None
+ }
+ }
+ val total = sizes.values.sum.toDouble
+ if (total < 1) {
+ until
+ } else {
+ until.map {
+ case (tp, end) =>
+ tp -> sizes.get(tp).map { size =>
+ val begin = from.get(tp).getOrElse(fromNew(tp))
+ val prorate = limit * (size / total)
+ logDebug(s"rateLimit $tp prorated amount is $prorate")
+ // Don't completely starve small topicpartitions
+ val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
+ logDebug(s"rateLimit $tp new offset is $off")
+ // Paranoia, make sure not to return an offset that's past end
+ Math.min(end, off)
+ }.getOrElse(end)
+ }
+ }
}
/**
@@ -153,11 +203,7 @@ private[kafka010] case class KafkaSource(
// Find the new partitions, and get their earliest offsets
val newPartitions = untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet)
- val newPartitionOffsets = if (newPartitions.nonEmpty) {
- fetchNewPartitionEarliestOffsets(newPartitions.toSeq)
- } else {
- Map.empty[TopicPartition, Long]
- }
+ val newPartitionOffsets = fetchNewPartitionEarliestOffsets(newPartitions.toSeq)
if (newPartitionOffsets.keySet != newPartitions) {
// We cannot get from offsets for some partitions. It means they got deleted.
val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
@@ -221,6 +267,12 @@ private[kafka010] case class KafkaSource(
logInfo("GetBatch generating RDD of offset range: " +
offsetRanges.sortBy(_.topicPartition.toString).mkString(", "))
+
+ // On recovery, getBatch will get called before getOffset
+ if (currentPartitionOffsets.isEmpty) {
+ currentPartitionOffsets = Some(untilPartitionOffsets)
+ }
+
sqlContext.createDataFrame(rdd, schema)
}
@@ -305,23 +357,28 @@ private[kafka010] case class KafkaSource(
* some partitions if they are deleted.
*/
private def fetchNewPartitionEarliestOffsets(
- newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
- // Poll to get the latest assigned partitions
- consumer.poll(0)
- val partitions = consumer.assignment()
- consumer.pause(partitions)
- logDebug(s"\tPartitions assigned to consumer: $partitions")
-
- // Get the earliest offset of each partition
- consumer.seekToBeginning(partitions)
- val partitionOffsets = newPartitions.filter { p =>
- // When deleting topics happen at the same time, some partitions may not be in `partitions`.
- // So we need to ignore them
- partitions.contains(p)
- }.map(p => p -> consumer.position(p)).toMap
- logDebug(s"Got earliest offsets for new partitions: $partitionOffsets")
- partitionOffsets
- }
+ newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long] =
+ if (newPartitions.isEmpty) {
+ Map.empty[TopicPartition, Long]
+ } else {
+ withRetriesWithoutInterrupt {
+ // Poll to get the latest assigned partitions
+ consumer.poll(0)
+ val partitions = consumer.assignment()
+ consumer.pause(partitions)
+ logDebug(s"\tPartitions assigned to consumer: $partitions")
+
+ // Get the earliest offset of each partition
+ consumer.seekToBeginning(partitions)
+ val partitionOffsets = newPartitions.filter { p =>
+ // When deleting topics happen at the same time, some partitions may not be in
+ // `partitions`. So we need to ignore them
+ partitions.contains(p)
+ }.map(p => p -> consumer.position(p)).toMap
+ logDebug(s"Got earliest offsets for new partitions: $partitionOffsets")
+ partitionOffsets
+ }
+ }
/**
* Helper function that does multiple retries on the a body of code that returns offsets.
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index b50688ecb7..ed4cc75920 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -23,13 +23,14 @@ import scala.util.Random
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.TopicPartition
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._
import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.streaming.{ ProcessingTime, StreamTest }
import org.apache.spark.sql.test.SharedSQLContext
-
abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
protected var testUtils: KafkaTestUtils = _
@@ -133,6 +134,72 @@ class KafkaSourceSuite extends KafkaSourceTest {
private val topicId = new AtomicInteger(0)
+ test("maxOffsetsPerTrigger") {
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 3)
+ testUtils.sendMessages(topic, (100 to 200).map(_.toString).toArray, Some(0))
+ testUtils.sendMessages(topic, (10 to 20).map(_.toString).toArray, Some(1))
+ testUtils.sendMessages(topic, Array("1"), Some(2))
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("maxOffsetsPerTrigger", 10)
+ .option("subscribe", topic)
+ .option("startingOffsets", "earliest")
+ val kafka = reader.load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
+
+ val clock = new StreamManualClock
+
+ val waitUntilBatchProcessed = AssertOnQuery { q =>
+ eventually(Timeout(streamingTimeout)) {
+ if (!q.exception.isDefined) {
+ assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+ }
+ }
+ if (q.exception.isDefined) {
+ throw q.exception.get
+ }
+ true
+ }
+
+ testStream(mapped)(
+ StartStream(ProcessingTime(100), clock),
+ waitUntilBatchProcessed,
+ // 1 from smallest, 1 from middle, 8 from biggest
+ CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107),
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ // smallest now empty, 1 more from middle, 9 more from biggest
+ CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
+ 11, 108, 109, 110, 111, 112, 113, 114, 115, 116
+ ),
+ StopStream,
+ StartStream(ProcessingTime(100), clock),
+ waitUntilBatchProcessed,
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ // smallest now empty, 1 more from middle, 9 more from biggest
+ CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
+ 11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
+ 12, 117, 118, 119, 120, 121, 122, 123, 124, 125
+ ),
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ // smallest now empty, 1 more from middle, 9 more from biggest
+ CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
+ 11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
+ 12, 117, 118, 119, 120, 121, 122, 123, 124, 125,
+ 13, 126, 127, 128, 129, 130, 131, 132, 133, 134
+ )
+ )
+ }
+
test("cannot stop Kafka stream") {
val topic = newTopic()
testUtils.createTopic(newTopic(), partitions = 5)