diff options
Diffstat (limited to 'external/kafka-0-10-sql/src/main/scala')
2 files changed, 33 insertions, 8 deletions
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala index 3f438e9918..3f396a7e6b 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala @@ -86,7 +86,7 @@ private[kafka010] case class CachedKafkaConsumer private( var toFetchOffset = offset while (toFetchOffset != UNKNOWN_OFFSET) { try { - return fetchData(toFetchOffset, pollTimeoutMs) + return fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss) } catch { case e: OffsetOutOfRangeException => // When there is some error thrown, it's better to use a new consumer to drop all cached @@ -159,14 +159,18 @@ private[kafka010] case class CachedKafkaConsumer private( } /** - * Get the record at `offset`. + * Get the record for the given offset if available. Otherwise it will either throw error + * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset), + * or null. * * @throws OffsetOutOfRangeException if `offset` is out of range * @throws TimeoutException if cannot fetch the record in `pollTimeoutMs` milliseconds. */ private def fetchData( offset: Long, - pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { + untilOffset: Long, + pollTimeoutMs: Long, + failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) { // This is the first fetch, or the last pre-fetched data has been drained. // Seek to the offset because we may call seekToBeginning or seekToEnd before this. @@ -190,10 +194,31 @@ private[kafka010] case class CachedKafkaConsumer private( } else { val record = fetchedData.next() nextOffsetInFetchedData = record.offset + 1 - // `seek` is always called before "poll". So "record.offset" must be same as "offset". - assert(record.offset == offset, - s"The fetched data has a different offset: expected $offset but was ${record.offset}") - record + // In general, Kafka uses the specified offset as the start point, and tries to fetch the next + // available offset. Hence we need to handle offset mismatch. + if (record.offset > offset) { + // This may happen when some records aged out but their offsets already got verified + if (failOnDataLoss) { + reportDataLoss(true, s"Cannot fetch records in [$offset, ${record.offset})") + // Never happen as "reportDataLoss" will throw an exception + null + } else { + if (record.offset >= untilOffset) { + reportDataLoss(false, s"Skip missing records in [$offset, $untilOffset)") + null + } else { + reportDataLoss(false, s"Skip missing records in [$offset, ${record.offset})") + record + } + } + } else if (record.offset < offset) { + // This should not happen. If it does happen, then we probably misunderstand Kafka internal + // mechanism. + throw new IllegalStateException( + s"Tried to fetch $offset but the returned record offset was ${record.offset}") + } else { + record + } } } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index d9ab4bb4f8..92ee0ed93d 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -102,7 +102,7 @@ private[kafka010] case class KafkaSource( sourceOptions.getOrElse("fetchOffset.numRetries", "3").toInt private val offsetFetchAttemptIntervalMs = - sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "10").toLong + sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "1000").toLong private val maxOffsetsPerTrigger = sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong) |