aboutsummaryrefslogtreecommitdiff
path: root/external/kafka
diff options
context:
space:
mode:
Diffstat (limited to 'external/kafka')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala3
1 files changed, 2 insertions, 1 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 0721ddaf70..d7cf500577 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -31,6 +31,7 @@ import kafka.message.MessageAndMetadata
import kafka.serializer.{DefaultDecoder, Decoder, StringDecoder}
import org.apache.spark.api.java.function.{Function => JFunction}
+import org.apache.spark.streaming.util.WriteAheadLogUtils
import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.annotation.Experimental
import org.apache.spark.rdd.RDD
@@ -80,7 +81,7 @@ object KafkaUtils {
topics: Map[String, Int],
storageLevel: StorageLevel
): ReceiverInputDStream[(K, V)] = {
- val walEnabled = ssc.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)
+ val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf)
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)
}