aboutsummaryrefslogtreecommitdiff
path: root/external/kafka/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-06 01:47:53 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-06 01:47:53 -0800
commitd0fd3b9ad238294346eb3465c489eabd41fb2380 (patch)
treed7f7f17fc842589f80c3ef16669741e3499d9692 /external/kafka/src/test
parent977bcc36d4440ff562d5dbcc12449bf383d0d9e2 (diff)
downloadspark-d0fd3b9ad238294346eb3465c489eabd41fb2380.tar.gz
spark-d0fd3b9ad238294346eb3465c489eabd41fb2380.tar.bz2
spark-d0fd3b9ad238294346eb3465c489eabd41fb2380.zip
Changed JavaStreamingContextWith*** to ***Function in streaming.api.java.*** package. Also fixed packages of Flume and MQTT tests.
Diffstat (limited to 'external/kafka/src/test')
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java15
1 files changed, 7 insertions, 8 deletions
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index 66236df662..fdea96e506 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -18,6 +18,8 @@
package org.apache.spark.streaming.kafka;
import java.util.HashMap;
+
+import org.apache.spark.streaming.api.java.kafka.KafkaFunctions;
import org.junit.Test;
import com.google.common.collect.Maps;
import kafka.serializer.StringDecoder;
@@ -31,21 +33,18 @@ public class JavaKafkaStreamSuite extends LocalJavaStreamingContext {
public void testKafkaStream() {
HashMap<String, Integer> topics = Maps.newHashMap();
- JavaStreamingContextWithKafka sscWithKafka = new JavaStreamingContextWithKafka(ssc);
+ KafkaFunctions kafkaFunc = new KafkaFunctions(ssc);
// tests the API, does not actually test data receiving
- JavaPairDStream<String, String> test1 = sscWithKafka.kafkaStream("localhost:12345", "group", topics);
- JavaPairDStream<String, String> test2 = sscWithKafka.kafkaStream("localhost:12345", "group", topics,
+ JavaPairDStream<String, String> test1 = kafkaFunc.kafkaStream("localhost:12345", "group", topics);
+ JavaPairDStream<String, String> test2 = kafkaFunc.kafkaStream("localhost:12345", "group", topics,
StorageLevel.MEMORY_AND_DISK_SER_2());
HashMap<String, String> kafkaParams = Maps.newHashMap();
- kafkaParams.put("zookeeper.connect","localhost:12345");
+ kafkaParams.put("zookeeper.connect", "localhost:12345");
kafkaParams.put("group.id","consumer-group");
- JavaPairDStream<String, String> test3 = sscWithKafka.kafkaStream(
+ JavaPairDStream<String, String> test3 = kafkaFunc.kafkaStream(
String.class, String.class, StringDecoder.class, StringDecoder.class,
kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2());
-
- // To verify that JavaStreamingContextWithKafka is also StreamingContext
- JavaDStream<String> socketStream = sscWithKafka.socketTextStream("localhost", 9999);
}
}