diff options
Diffstat (limited to 'external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala')
-rw-r--r-- | external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala | 102 |
1 files changed, 73 insertions, 29 deletions
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala index 3f396a7e6b..15b28256e8 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala @@ -44,6 +44,9 @@ private[kafka010] case class CachedKafkaConsumer private( private var consumer = createConsumer + /** indicates whether this consumer is in use or not */ + private var inuse = true + /** Iterator to the already fetch data */ private var fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]] private var nextOffsetInFetchedData = UNKNOWN_OFFSET @@ -57,6 +60,20 @@ private[kafka010] case class CachedKafkaConsumer private( c } + case class AvailableOffsetRange(earliest: Long, latest: Long) + + /** + * Return the available offset range of the current partition. It's a pair of the earliest offset + * and the latest offset. + */ + def getAvailableOffsetRange(): AvailableOffsetRange = { + consumer.seekToBeginning(Set(topicPartition).asJava) + val earliestOffset = consumer.position(topicPartition) + consumer.seekToEnd(Set(topicPartition).asJava) + val latestOffset = consumer.position(topicPartition) + AvailableOffsetRange(earliestOffset, latestOffset) + } + /** * Get the record for the given offset if available. Otherwise it will either throw error * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset), @@ -107,9 +124,9 @@ private[kafka010] case class CachedKafkaConsumer private( * `UNKNOWN_OFFSET`. */ private def getEarliestAvailableOffsetBetween(offset: Long, untilOffset: Long): Long = { - val (earliestOffset, latestOffset) = getAvailableOffsetRange() - logWarning(s"Some data may be lost. Recovering from the earliest offset: $earliestOffset") - if (offset >= latestOffset || earliestOffset >= untilOffset) { + val range = getAvailableOffsetRange() + logWarning(s"Some data may be lost. Recovering from the earliest offset: ${range.earliest}") + if (offset >= range.latest || range.earliest >= untilOffset) { // [offset, untilOffset) and [earliestOffset, latestOffset) have no overlap, // either // -------------------------------------------------------- @@ -124,13 +141,13 @@ private[kafka010] case class CachedKafkaConsumer private( // offset untilOffset earliestOffset latestOffset val warningMessage = s""" - |The current available offset range is [$earliestOffset, $latestOffset). + |The current available offset range is $range. | Offset ${offset} is out of range, and records in [$offset, $untilOffset) will be | skipped ${additionalMessage(failOnDataLoss = false)} """.stripMargin logWarning(warningMessage) UNKNOWN_OFFSET - } else if (offset >= earliestOffset) { + } else if (offset >= range.earliest) { // ----------------------------------------------------------------------------- // ^ ^ ^ ^ // | | | | @@ -149,12 +166,12 @@ private[kafka010] case class CachedKafkaConsumer private( // offset earliestOffset min(untilOffset,latestOffset) max(untilOffset, latestOffset) val warningMessage = s""" - |The current available offset range is [$earliestOffset, $latestOffset). - | Offset ${offset} is out of range, and records in [$offset, $earliestOffset) will be + |The current available offset range is $range. + | Offset ${offset} is out of range, and records in [$offset, ${range.earliest}) will be | skipped ${additionalMessage(failOnDataLoss = false)} """.stripMargin logWarning(warningMessage) - earliestOffset + range.earliest } } @@ -183,8 +200,8 @@ private[kafka010] case class CachedKafkaConsumer private( // - `offset` is out of range so that Kafka returns nothing. Just throw // `OffsetOutOfRangeException` to let the caller handle it. // - Cannot fetch any data before timeout. TimeoutException will be thrown. - val (earliestOffset, latestOffset) = getAvailableOffsetRange() - if (offset < earliestOffset || offset >= latestOffset) { + val range = getAvailableOffsetRange() + if (offset < range.earliest || offset >= range.latest) { throw new OffsetOutOfRangeException( Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava) } else { @@ -284,18 +301,6 @@ private[kafka010] case class CachedKafkaConsumer private( logDebug(s"Polled $groupId ${p.partitions()} ${r.size}") fetchedData = r.iterator } - - /** - * Return the available offset range of the current partition. It's a pair of the earliest offset - * and the latest offset. - */ - private def getAvailableOffsetRange(): (Long, Long) = { - consumer.seekToBeginning(Set(topicPartition).asJava) - val earliestOffset = consumer.position(topicPartition) - consumer.seekToEnd(Set(topicPartition).asJava) - val latestOffset = consumer.position(topicPartition) - (earliestOffset, latestOffset) - } } private[kafka010] object CachedKafkaConsumer extends Logging { @@ -310,7 +315,7 @@ private[kafka010] object CachedKafkaConsumer extends Logging { new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer](capacity, 0.75f, true) { override def removeEldestEntry( entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer]): Boolean = { - if (this.size > capacity) { + if (entry.getValue.inuse == false && this.size > capacity) { logWarning(s"KafkaConsumer cache hitting max capacity of $capacity, " + s"removing consumer for ${entry.getKey}") try { @@ -327,6 +332,43 @@ private[kafka010] object CachedKafkaConsumer extends Logging { } } + def releaseKafkaConsumer( + topic: String, + partition: Int, + kafkaParams: ju.Map[String, Object]): Unit = { + val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] + val topicPartition = new TopicPartition(topic, partition) + val key = CacheKey(groupId, topicPartition) + + synchronized { + val consumer = cache.get(key) + if (consumer != null) { + consumer.inuse = false + } else { + logWarning(s"Attempting to release consumer that does not exist") + } + } + } + + /** + * Removes (and closes) the Kafka Consumer for the given topic, partition and group id. + */ + def removeKafkaConsumer( + topic: String, + partition: Int, + kafkaParams: ju.Map[String, Object]): Unit = { + val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] + val topicPartition = new TopicPartition(topic, partition) + val key = CacheKey(groupId, topicPartition) + + synchronized { + val removedConsumer = cache.remove(key) + if (removedConsumer != null) { + removedConsumer.close() + } + } + } + /** * Get a cached consumer for groupId, assigned to topic and partition. * If matching consumer doesn't already exist, will be created using kafkaParams. @@ -342,16 +384,18 @@ private[kafka010] object CachedKafkaConsumer extends Logging { // If this is reattempt at running the task, then invalidate cache and start with // a new consumer if (TaskContext.get != null && TaskContext.get.attemptNumber > 1) { - val removedConsumer = cache.remove(key) - if (removedConsumer != null) { - removedConsumer.close() - } - new CachedKafkaConsumer(topicPartition, kafkaParams) + removeKafkaConsumer(topic, partition, kafkaParams) + val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams) + consumer.inuse = true + cache.put(key, consumer) + consumer } else { if (!cache.containsKey(key)) { cache.put(key, new CachedKafkaConsumer(topicPartition, kafkaParams)) } - cache.get(key) + val consumer = cache.get(key) + consumer.inuse = true + consumer } } } |