aboutsummaryrefslogtreecommitdiff
path: root/external/kafka
diff options
context:
space:
mode:
Diffstat (limited to 'external/kafka')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala4
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala4
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala2
3 files changed, 5 insertions, 5 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index 8a087474d3..54d8c8b03f 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -58,11 +58,11 @@ class DirectKafkaInputDStream[
U <: Decoder[K]: ClassTag,
T <: Decoder[V]: ClassTag,
R: ClassTag](
- ssc_ : StreamingContext,
+ _ssc: StreamingContext,
val kafkaParams: Map[String, String],
val fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
- ) extends InputDStream[R](ssc_) with Logging {
+ ) extends InputDStream[R](_ssc) with Logging {
val maxRetries = context.sparkContext.getConf.getInt(
"spark.streaming.kafka.maxRetries", 1)
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index 67f2360896..89d1811c99 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -48,12 +48,12 @@ class KafkaInputDStream[
V: ClassTag,
U <: Decoder[_]: ClassTag,
T <: Decoder[_]: ClassTag](
- ssc_ : StreamingContext,
+ _ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
useReliableReceiver: Boolean,
storageLevel: StorageLevel
- ) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {
+ ) extends ReceiverInputDStream[(K, V)](_ssc) with Logging {
def getReceiver(): Receiver[(K, V)] = {
if (!useReliableReceiver) {
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
index 80e2df62de..7b9aee39ff 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
@@ -50,7 +50,7 @@ class ReliableKafkaStreamSuite extends SparkFunSuite
private var ssc: StreamingContext = _
private var tempDirectory: File = null
- override def beforeAll() : Unit = {
+ override def beforeAll(): Unit = {
kafkaTestUtils = new KafkaTestUtils
kafkaTestUtils.setup()