aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-10-sql/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'external/kafka-0-10-sql/src/main')
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala39
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala2
2 files changed, 33 insertions, 8 deletions
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
index 3f438e9918..3f396a7e6b 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
@@ -86,7 +86,7 @@ private[kafka010] case class CachedKafkaConsumer private(
var toFetchOffset = offset
while (toFetchOffset != UNKNOWN_OFFSET) {
try {
- return fetchData(toFetchOffset, pollTimeoutMs)
+ return fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss)
} catch {
case e: OffsetOutOfRangeException =>
// When there is some error thrown, it's better to use a new consumer to drop all cached
@@ -159,14 +159,18 @@ private[kafka010] case class CachedKafkaConsumer private(
}
/**
- * Get the record at `offset`.
+ * Get the record for the given offset if available. Otherwise it will either throw error
+ * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
+ * or null.
*
* @throws OffsetOutOfRangeException if `offset` is out of range
* @throws TimeoutException if cannot fetch the record in `pollTimeoutMs` milliseconds.
*/
private def fetchData(
offset: Long,
- pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+ untilOffset: Long,
+ pollTimeoutMs: Long,
+ failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
// This is the first fetch, or the last pre-fetched data has been drained.
// Seek to the offset because we may call seekToBeginning or seekToEnd before this.
@@ -190,10 +194,31 @@ private[kafka010] case class CachedKafkaConsumer private(
} else {
val record = fetchedData.next()
nextOffsetInFetchedData = record.offset + 1
- // `seek` is always called before "poll". So "record.offset" must be same as "offset".
- assert(record.offset == offset,
- s"The fetched data has a different offset: expected $offset but was ${record.offset}")
- record
+ // In general, Kafka uses the specified offset as the start point, and tries to fetch the next
+ // available offset. Hence we need to handle offset mismatch.
+ if (record.offset > offset) {
+ // This may happen when some records aged out but their offsets already got verified
+ if (failOnDataLoss) {
+ reportDataLoss(true, s"Cannot fetch records in [$offset, ${record.offset})")
+ // Never happen as "reportDataLoss" will throw an exception
+ null
+ } else {
+ if (record.offset >= untilOffset) {
+ reportDataLoss(false, s"Skip missing records in [$offset, $untilOffset)")
+ null
+ } else {
+ reportDataLoss(false, s"Skip missing records in [$offset, ${record.offset})")
+ record
+ }
+ }
+ } else if (record.offset < offset) {
+ // This should not happen. If it does happen, then we probably misunderstand Kafka internal
+ // mechanism.
+ throw new IllegalStateException(
+ s"Tried to fetch $offset but the returned record offset was ${record.offset}")
+ } else {
+ record
+ }
}
}
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index d9ab4bb4f8..92ee0ed93d 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -102,7 +102,7 @@ private[kafka010] case class KafkaSource(
sourceOptions.getOrElse("fetchOffset.numRetries", "3").toInt
private val offsetFetchAttemptIntervalMs =
- sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "10").toLong
+ sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "1000").toLong
private val maxOffsetsPerTrigger =
sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)