diff options
Diffstat (limited to 'external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala')
-rw-r--r-- | external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala | 21 |
1 files changed, 7 insertions, 14 deletions
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 585ced875c..aa01238f91 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -85,14 +85,11 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider case None => LatestOffsets } - val kafkaParamsForStrategy = + val kafkaParamsForDriver = ConfigUpdater("source", specifiedKafkaParams) .set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName) .set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserClassName) - // So that consumers in Kafka source do not mess with any existing group id - .set(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-driver") - // Set to "earliest" to avoid exceptions. However, KafkaSource will fetch the initial // offsets by itself instead of counting on KafkaConsumer. .set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") @@ -129,17 +126,11 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider val strategy = caseInsensitiveParams.find(x => STRATEGY_OPTION_KEYS.contains(x._1)).get match { case ("assign", value) => - AssignStrategy( - JsonUtils.partitions(value), - kafkaParamsForStrategy) + AssignStrategy(JsonUtils.partitions(value)) case ("subscribe", value) => - SubscribeStrategy( - value.split(",").map(_.trim()).filter(_.nonEmpty), - kafkaParamsForStrategy) + SubscribeStrategy(value.split(",").map(_.trim()).filter(_.nonEmpty)) case ("subscribepattern", value) => - SubscribePatternStrategy( - value.trim(), - kafkaParamsForStrategy) + SubscribePatternStrategy(value.trim()) case _ => // Should never reach here as we are already matching on // matched strategy names @@ -152,11 +143,13 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider new KafkaSource( sqlContext, strategy, + kafkaParamsForDriver, kafkaParamsForExecutors, parameters, metadataPath, startingOffsets, - failOnDataLoss) + failOnDataLoss, + driverGroupIdPrefix = s"$uniqueGroupId-driver") } private def validateOptions(parameters: Map[String, String]): Unit = { |