aboutsummaryrefslogtreecommitdiff
path: root/external/kafka
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-06 01:47:53 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-06 01:47:53 -0800
commitd0fd3b9ad238294346eb3465c489eabd41fb2380 (patch)
treed7f7f17fc842589f80c3ef16669741e3499d9692 /external/kafka
parent977bcc36d4440ff562d5dbcc12449bf383d0d9e2 (diff)
downloadspark-d0fd3b9ad238294346eb3465c489eabd41fb2380.tar.gz
spark-d0fd3b9ad238294346eb3465c489eabd41fb2380.tar.bz2
spark-d0fd3b9ad238294346eb3465c489eabd41fb2380.zip
Changed JavaStreamingContextWith*** to ***Function in streaming.api.java.*** package. Also fixed packages of Flume and MQTT tests.
Diffstat (limited to 'external/kafka')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala (renamed from external/kafka/src/main/scala/org/apache/spark/streaming/kafka/JavaStreamingContextWithKafka.scala)12
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java15
2 files changed, 13 insertions, 14 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/JavaStreamingContextWithKafka.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala
index ab0e8a6c8d..491331bb37 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/JavaStreamingContextWithKafka.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.kafka
+package org.apache.spark.streaming.api.java.kafka
import scala.reflect.ClassTag
import scala.collection.JavaConversions._
@@ -27,13 +27,13 @@ import kafka.serializer.Decoder
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream}
+import org.apache.spark.streaming.kafka._
/**
* Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
* functions for creating Kafka input streams.
*/
-class JavaStreamingContextWithKafka(javaStreamingContext: JavaStreamingContext)
- extends JavaStreamingContext(javaStreamingContext.ssc) {
+class KafkaFunctions(javaStreamingContext: JavaStreamingContext) {
/**
* Create an input stream that pulls messages form a Kafka Broker.
@@ -49,7 +49,7 @@ class JavaStreamingContextWithKafka(javaStreamingContext: JavaStreamingContext)
): JavaPairDStream[String, String] = {
implicit val cmt: ClassTag[String] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
- ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
+ javaStreamingContext.ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
}
/**
@@ -69,7 +69,7 @@ class JavaStreamingContextWithKafka(javaStreamingContext: JavaStreamingContext)
): JavaPairDStream[String, String] = {
implicit val cmt: ClassTag[String] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
- ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
+ javaStreamingContext.ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
}
/**
@@ -101,7 +101,7 @@ class JavaStreamingContextWithKafka(javaStreamingContext: JavaStreamingContext)
implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]]
implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]]
- ssc.kafkaStream[K, V, U, T](
+ javaStreamingContext.ssc.kafkaStream[K, V, U, T](
kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
}
}
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index 66236df662..fdea96e506 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -18,6 +18,8 @@
package org.apache.spark.streaming.kafka;
import java.util.HashMap;
+
+import org.apache.spark.streaming.api.java.kafka.KafkaFunctions;
import org.junit.Test;
import com.google.common.collect.Maps;
import kafka.serializer.StringDecoder;
@@ -31,21 +33,18 @@ public class JavaKafkaStreamSuite extends LocalJavaStreamingContext {
public void testKafkaStream() {
HashMap<String, Integer> topics = Maps.newHashMap();
- JavaStreamingContextWithKafka sscWithKafka = new JavaStreamingContextWithKafka(ssc);
+ KafkaFunctions kafkaFunc = new KafkaFunctions(ssc);
// tests the API, does not actually test data receiving
- JavaPairDStream<String, String> test1 = sscWithKafka.kafkaStream("localhost:12345", "group", topics);
- JavaPairDStream<String, String> test2 = sscWithKafka.kafkaStream("localhost:12345", "group", topics,
+ JavaPairDStream<String, String> test1 = kafkaFunc.kafkaStream("localhost:12345", "group", topics);
+ JavaPairDStream<String, String> test2 = kafkaFunc.kafkaStream("localhost:12345", "group", topics,
StorageLevel.MEMORY_AND_DISK_SER_2());
HashMap<String, String> kafkaParams = Maps.newHashMap();
- kafkaParams.put("zookeeper.connect","localhost:12345");
+ kafkaParams.put("zookeeper.connect", "localhost:12345");
kafkaParams.put("group.id","consumer-group");
- JavaPairDStream<String, String> test3 = sscWithKafka.kafkaStream(
+ JavaPairDStream<String, String> test3 = kafkaFunc.kafkaStream(
String.class, String.class, StringDecoder.class, StringDecoder.class,
kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2());
-
- // To verify that JavaStreamingContextWithKafka is also StreamingContext
- JavaDStream<String> socketStream = sscWithKafka.socketTextStream("localhost", 9999);
}
}