aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala55
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala19
2 files changed, 48 insertions, 26 deletions
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 1be70db874..4b0bb0a0f7 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -82,6 +82,7 @@ private[kafka010] case class KafkaSource(
executorKafkaParams: ju.Map[String, Object],
sourceOptions: Map[String, String],
metadataPath: String,
+ startFromEarliestOffset: Boolean,
failOnDataLoss: Boolean)
extends Source with Logging {
@@ -109,7 +110,11 @@ private[kafka010] case class KafkaSource(
private lazy val initialPartitionOffsets = {
val metadataLog = new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath)
metadataLog.get(0).getOrElse {
- val offsets = KafkaSourceOffset(fetchPartitionOffsets(seekToEnd = false))
+ val offsets = if (startFromEarliestOffset) {
+ KafkaSourceOffset(fetchEarliestOffsets())
+ } else {
+ KafkaSourceOffset(fetchLatestOffsets())
+ }
metadataLog.add(0, offsets)
logInfo(s"Initial offsets: $offsets")
offsets
@@ -123,7 +128,7 @@ private[kafka010] case class KafkaSource(
// Make sure initialPartitionOffsets is initialized
initialPartitionOffsets
- val offset = KafkaSourceOffset(fetchPartitionOffsets(seekToEnd = true))
+ val offset = KafkaSourceOffset(fetchLatestOffsets())
logDebug(s"GetOffset: ${offset.partitionToOffsets.toSeq.map(_.toString).sorted}")
Some(offset)
}
@@ -227,26 +232,34 @@ private[kafka010] case class KafkaSource(
override def toString(): String = s"KafkaSource[$consumerStrategy]"
/**
- * Fetch the offset of a partition, either seek to the latest offsets or use the current offsets
- * in the consumer.
+ * Fetch the earliest offsets of partitions.
*/
- private def fetchPartitionOffsets(
- seekToEnd: Boolean): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
- // Make sure `KafkaConsumer.poll` won't be interrupted (KAFKA-1894)
- assert(Thread.currentThread().isInstanceOf[StreamExecutionThread])
+ private def fetchEarliestOffsets(): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
// Poll to get the latest assigned partitions
consumer.poll(0)
val partitions = consumer.assignment()
consumer.pause(partitions)
- logDebug(s"Partitioned assigned to consumer: $partitions")
+ logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the beginning")
- // Get the current or latest offset of each partition
- if (seekToEnd) {
- consumer.seekToEnd(partitions)
- logDebug("Seeked to the end")
- }
+ consumer.seekToBeginning(partitions)
+ val partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap
+ logDebug(s"Got earliest offsets for partition : $partitionOffsets")
+ partitionOffsets
+ }
+
+ /**
+ * Fetch the latest offset of partitions.
+ */
+ private def fetchLatestOffsets(): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
+ // Poll to get the latest assigned partitions
+ consumer.poll(0)
+ val partitions = consumer.assignment()
+ consumer.pause(partitions)
+ logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the end.")
+
+ consumer.seekToEnd(partitions)
val partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap
- logDebug(s"Got offsets for partition : $partitionOffsets")
+ logDebug(s"Got latest offsets for partition : $partitionOffsets")
partitionOffsets
}
@@ -256,22 +269,21 @@ private[kafka010] case class KafkaSource(
*/
private def fetchNewPartitionEarliestOffsets(
newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
- // Make sure `KafkaConsumer.poll` won't be interrupted (KAFKA-1894)
- assert(Thread.currentThread().isInstanceOf[StreamExecutionThread])
// Poll to get the latest assigned partitions
consumer.poll(0)
val partitions = consumer.assignment()
+ consumer.pause(partitions)
logDebug(s"\tPartitioned assigned to consumer: $partitions")
// Get the earliest offset of each partition
consumer.seekToBeginning(partitions)
- val partitionToOffsets = newPartitions.filter { p =>
+ val partitionOffsets = newPartitions.filter { p =>
// When deleting topics happen at the same time, some partitions may not be in `partitions`.
// So we need to ignore them
partitions.contains(p)
}.map(p => p -> consumer.position(p)).toMap
- logDebug(s"Got offsets for new partitions: $partitionToOffsets")
- partitionToOffsets
+ logDebug(s"Got earliest offsets for new partitions: $partitionOffsets")
+ partitionOffsets
}
/**
@@ -284,6 +296,9 @@ private[kafka010] case class KafkaSource(
*/
private def withRetriesWithoutInterrupt(
body: => Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
+ // Make sure `KafkaConsumer.poll` won't be interrupted (KAFKA-1894)
+ assert(Thread.currentThread().isInstanceOf[StreamExecutionThread])
+
synchronized {
var result: Option[Map[TopicPartition, Long]] = None
var attempt = 1
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 1b0a2fe955..23b1b60f3b 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -77,10 +77,15 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
// id. Hence, we should generate a unique id for each query.
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
- val autoOffsetResetValue = caseInsensitiveParams.get(STARTING_OFFSET_OPTION_KEY) match {
- case Some(value) => value.trim() // same values as those supported by auto.offset.reset
- case None => "latest"
- }
+ val startFromEarliestOffset =
+ caseInsensitiveParams.get(STARTING_OFFSET_OPTION_KEY).map(_.trim.toLowerCase) match {
+ case Some("latest") => false
+ case Some("earliest") => true
+ case Some(pos) =>
+ // This should not happen since we have already checked the options.
+ throw new IllegalStateException(s"Invalid $STARTING_OFFSET_OPTION_KEY: $pos")
+ case None => false
+ }
val kafkaParamsForStrategy =
ConfigUpdater("source", specifiedKafkaParams)
@@ -90,8 +95,9 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
// So that consumers in Kafka source do not mess with any existing group id
.set(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-driver")
- // So that consumers can start from earliest or latest
- .set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetValue)
+ // Set to "latest" to avoid exceptions. However, KafkaSource will fetch the initial offsets
+ // by itself instead of counting on KafkaConsumer.
+ .set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
// So that consumers in the driver does not commit offsets unnecessarily
.set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
@@ -147,6 +153,7 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
kafkaParamsForExecutors,
parameters,
metadataPath,
+ startFromEarliestOffset,
failOnDataLoss)
}