aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
diff options
context:
space:
mode:
Diffstat (limited to 'external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala')
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala21
1 files changed, 7 insertions, 14 deletions
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 585ced875c..aa01238f91 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -85,14 +85,11 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
case None => LatestOffsets
}
- val kafkaParamsForStrategy =
+ val kafkaParamsForDriver =
ConfigUpdater("source", specifiedKafkaParams)
.set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
.set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserClassName)
- // So that consumers in Kafka source do not mess with any existing group id
- .set(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-driver")
-
// Set to "earliest" to avoid exceptions. However, KafkaSource will fetch the initial
// offsets by itself instead of counting on KafkaConsumer.
.set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
@@ -129,17 +126,11 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
val strategy = caseInsensitiveParams.find(x => STRATEGY_OPTION_KEYS.contains(x._1)).get match {
case ("assign", value) =>
- AssignStrategy(
- JsonUtils.partitions(value),
- kafkaParamsForStrategy)
+ AssignStrategy(JsonUtils.partitions(value))
case ("subscribe", value) =>
- SubscribeStrategy(
- value.split(",").map(_.trim()).filter(_.nonEmpty),
- kafkaParamsForStrategy)
+ SubscribeStrategy(value.split(",").map(_.trim()).filter(_.nonEmpty))
case ("subscribepattern", value) =>
- SubscribePatternStrategy(
- value.trim(),
- kafkaParamsForStrategy)
+ SubscribePatternStrategy(value.trim())
case _ =>
// Should never reach here as we are already matching on
// matched strategy names
@@ -152,11 +143,13 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
new KafkaSource(
sqlContext,
strategy,
+ kafkaParamsForDriver,
kafkaParamsForExecutors,
parameters,
metadataPath,
startingOffsets,
- failOnDataLoss)
+ failOnDataLoss,
+ driverGroupIdPrefix = s"$uniqueGroupId-driver")
}
private def validateOptions(parameters: Map[String, String]): Unit = {