aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
diff options
context:
space:
mode:
Diffstat (limited to 'external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala')
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala102
1 files changed, 73 insertions, 29 deletions
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
index 3f396a7e6b..15b28256e8 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
@@ -44,6 +44,9 @@ private[kafka010] case class CachedKafkaConsumer private(
private var consumer = createConsumer
+ /** indicates whether this consumer is in use or not */
+ private var inuse = true
+
/** Iterator to the already fetch data */
private var fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
private var nextOffsetInFetchedData = UNKNOWN_OFFSET
@@ -57,6 +60,20 @@ private[kafka010] case class CachedKafkaConsumer private(
c
}
+ case class AvailableOffsetRange(earliest: Long, latest: Long)
+
+ /**
+ * Return the available offset range of the current partition. It's a pair of the earliest offset
+ * and the latest offset.
+ */
+ def getAvailableOffsetRange(): AvailableOffsetRange = {
+ consumer.seekToBeginning(Set(topicPartition).asJava)
+ val earliestOffset = consumer.position(topicPartition)
+ consumer.seekToEnd(Set(topicPartition).asJava)
+ val latestOffset = consumer.position(topicPartition)
+ AvailableOffsetRange(earliestOffset, latestOffset)
+ }
+
/**
* Get the record for the given offset if available. Otherwise it will either throw error
* (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
@@ -107,9 +124,9 @@ private[kafka010] case class CachedKafkaConsumer private(
* `UNKNOWN_OFFSET`.
*/
private def getEarliestAvailableOffsetBetween(offset: Long, untilOffset: Long): Long = {
- val (earliestOffset, latestOffset) = getAvailableOffsetRange()
- logWarning(s"Some data may be lost. Recovering from the earliest offset: $earliestOffset")
- if (offset >= latestOffset || earliestOffset >= untilOffset) {
+ val range = getAvailableOffsetRange()
+ logWarning(s"Some data may be lost. Recovering from the earliest offset: ${range.earliest}")
+ if (offset >= range.latest || range.earliest >= untilOffset) {
// [offset, untilOffset) and [earliestOffset, latestOffset) have no overlap,
// either
// --------------------------------------------------------
@@ -124,13 +141,13 @@ private[kafka010] case class CachedKafkaConsumer private(
// offset untilOffset earliestOffset latestOffset
val warningMessage =
s"""
- |The current available offset range is [$earliestOffset, $latestOffset).
+ |The current available offset range is $range.
| Offset ${offset} is out of range, and records in [$offset, $untilOffset) will be
| skipped ${additionalMessage(failOnDataLoss = false)}
""".stripMargin
logWarning(warningMessage)
UNKNOWN_OFFSET
- } else if (offset >= earliestOffset) {
+ } else if (offset >= range.earliest) {
// -----------------------------------------------------------------------------
// ^ ^ ^ ^
// | | | |
@@ -149,12 +166,12 @@ private[kafka010] case class CachedKafkaConsumer private(
// offset earliestOffset min(untilOffset,latestOffset) max(untilOffset, latestOffset)
val warningMessage =
s"""
- |The current available offset range is [$earliestOffset, $latestOffset).
- | Offset ${offset} is out of range, and records in [$offset, $earliestOffset) will be
+ |The current available offset range is $range.
+ | Offset ${offset} is out of range, and records in [$offset, ${range.earliest}) will be
| skipped ${additionalMessage(failOnDataLoss = false)}
""".stripMargin
logWarning(warningMessage)
- earliestOffset
+ range.earliest
}
}
@@ -183,8 +200,8 @@ private[kafka010] case class CachedKafkaConsumer private(
// - `offset` is out of range so that Kafka returns nothing. Just throw
// `OffsetOutOfRangeException` to let the caller handle it.
// - Cannot fetch any data before timeout. TimeoutException will be thrown.
- val (earliestOffset, latestOffset) = getAvailableOffsetRange()
- if (offset < earliestOffset || offset >= latestOffset) {
+ val range = getAvailableOffsetRange()
+ if (offset < range.earliest || offset >= range.latest) {
throw new OffsetOutOfRangeException(
Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
} else {
@@ -284,18 +301,6 @@ private[kafka010] case class CachedKafkaConsumer private(
logDebug(s"Polled $groupId ${p.partitions()} ${r.size}")
fetchedData = r.iterator
}
-
- /**
- * Return the available offset range of the current partition. It's a pair of the earliest offset
- * and the latest offset.
- */
- private def getAvailableOffsetRange(): (Long, Long) = {
- consumer.seekToBeginning(Set(topicPartition).asJava)
- val earliestOffset = consumer.position(topicPartition)
- consumer.seekToEnd(Set(topicPartition).asJava)
- val latestOffset = consumer.position(topicPartition)
- (earliestOffset, latestOffset)
- }
}
private[kafka010] object CachedKafkaConsumer extends Logging {
@@ -310,7 +315,7 @@ private[kafka010] object CachedKafkaConsumer extends Logging {
new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer](capacity, 0.75f, true) {
override def removeEldestEntry(
entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer]): Boolean = {
- if (this.size > capacity) {
+ if (entry.getValue.inuse == false && this.size > capacity) {
logWarning(s"KafkaConsumer cache hitting max capacity of $capacity, " +
s"removing consumer for ${entry.getKey}")
try {
@@ -327,6 +332,43 @@ private[kafka010] object CachedKafkaConsumer extends Logging {
}
}
+ def releaseKafkaConsumer(
+ topic: String,
+ partition: Int,
+ kafkaParams: ju.Map[String, Object]): Unit = {
+ val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+ val topicPartition = new TopicPartition(topic, partition)
+ val key = CacheKey(groupId, topicPartition)
+
+ synchronized {
+ val consumer = cache.get(key)
+ if (consumer != null) {
+ consumer.inuse = false
+ } else {
+ logWarning(s"Attempting to release consumer that does not exist")
+ }
+ }
+ }
+
+ /**
+ * Removes (and closes) the Kafka Consumer for the given topic, partition and group id.
+ */
+ def removeKafkaConsumer(
+ topic: String,
+ partition: Int,
+ kafkaParams: ju.Map[String, Object]): Unit = {
+ val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+ val topicPartition = new TopicPartition(topic, partition)
+ val key = CacheKey(groupId, topicPartition)
+
+ synchronized {
+ val removedConsumer = cache.remove(key)
+ if (removedConsumer != null) {
+ removedConsumer.close()
+ }
+ }
+ }
+
/**
* Get a cached consumer for groupId, assigned to topic and partition.
* If matching consumer doesn't already exist, will be created using kafkaParams.
@@ -342,16 +384,18 @@ private[kafka010] object CachedKafkaConsumer extends Logging {
// If this is reattempt at running the task, then invalidate cache and start with
// a new consumer
if (TaskContext.get != null && TaskContext.get.attemptNumber > 1) {
- val removedConsumer = cache.remove(key)
- if (removedConsumer != null) {
- removedConsumer.close()
- }
- new CachedKafkaConsumer(topicPartition, kafkaParams)
+ removeKafkaConsumer(topic, partition, kafkaParams)
+ val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams)
+ consumer.inuse = true
+ cache.put(key, consumer)
+ consumer
} else {
if (!cache.containsKey(key)) {
cache.put(key, new CachedKafkaConsumer(topicPartition, kafkaParams))
}
- cache.get(key)
+ val consumer = cache.get(key)
+ consumer.inuse = true
+ consumer
}
}
}