aboutsummaryrefslogtreecommitdiff
path: root/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala')
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala8
1 files changed, 4 insertions, 4 deletions
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
index 2ef3e99c55..9c81f23c19 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -28,11 +28,11 @@ class KafkaStreamSuite extends TestSuiteBase {
val topics = Map("my-topic" -> 1)
// tests the API, does not actually test data receiving
- val test1 = ssc.kafkaStream("localhost:12345", "group", topics)
- val test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test1 = KafkaUtils.createStream(ssc, "localhost:1234", "group", topics)
+ val test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2)
val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group")
- val test3 = ssc.kafkaStream[String, String, StringDecoder, StringDecoder](
- kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test3 = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
+ ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2)
// TODO: Actually test receiving data
}