aboutsummaryrefslogtreecommitdiff
path: root/external/kafka
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-07 01:56:15 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-07 01:56:15 -0800
commitaa99f226a691ddcb4442d60f4cd4908f434cc4ce (patch)
tree33a1614e3d5ee7a050776e3601ba8c7430b573f8 /external/kafka
parent3b4c4c7f4d0d6e45a1acb0baf0d9416a8997b686 (diff)
downloadspark-aa99f226a691ddcb4442d60f4cd4908f434cc4ce.tar.gz
spark-aa99f226a691ddcb4442d60f4cd4908f434cc4ce.tar.bz2
spark-aa99f226a691ddcb4442d60f4cd4908f434cc4ce.zip
Removed XYZFunctions and added XYZUtils as a common Scala and Java interface for creating XYZ streams.
Diffstat (limited to 'external/kafka')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala107
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaFunctions.scala73
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala153
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala23
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java11
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala8
6 files changed, 160 insertions, 215 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala
deleted file mode 100644
index 491331bb37..0000000000
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.api.java.kafka
-
-import scala.reflect.ClassTag
-import scala.collection.JavaConversions._
-
-import java.lang.{Integer => JInt}
-import java.util.{Map => JMap}
-
-import kafka.serializer.Decoder
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream}
-import org.apache.spark.streaming.kafka._
-
-/**
- * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
- * functions for creating Kafka input streams.
- */
-class KafkaFunctions(javaStreamingContext: JavaStreamingContext) {
-
- /**
- * Create an input stream that pulls messages form a Kafka Broker.
- * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
- * @param groupId The group id for this consumer.
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- */
- def kafkaStream(
- zkQuorum: String,
- groupId: String,
- topics: JMap[String, JInt]
- ): JavaPairDStream[String, String] = {
- implicit val cmt: ClassTag[String] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
- javaStreamingContext.ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
- }
-
- /**
- * Create an input stream that pulls messages form a Kafka Broker.
- * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
- * @param groupId The group id for this consumer.
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- * @param storageLevel RDD storage level.
- *
- */
- def kafkaStream(
- zkQuorum: String,
- groupId: String,
- topics: JMap[String, JInt],
- storageLevel: StorageLevel
- ): JavaPairDStream[String, String] = {
- implicit val cmt: ClassTag[String] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
- javaStreamingContext.ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
- }
-
- /**
- * Create an input stream that pulls messages form a Kafka Broker.
- * @param keyTypeClass Key type of RDD
- * @param valueTypeClass value type of RDD
- * @param keyDecoderClass Type of kafka key decoder
- * @param valueDecoderClass Type of kafka value decoder
- * @param kafkaParams Map of kafka configuration paramaters.
- * See: http://kafka.apache.org/configuration.html
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- * @param storageLevel RDD storage level. Defaults to memory-only
- */
- def kafkaStream[K, V, U <: Decoder[_], T <: Decoder[_]](
- keyTypeClass: Class[K],
- valueTypeClass: Class[V],
- keyDecoderClass: Class[U],
- valueDecoderClass: Class[T],
- kafkaParams: JMap[String, String],
- topics: JMap[String, JInt],
- storageLevel: StorageLevel
- ): JavaPairDStream[K, V] = {
- implicit val keyCmt: ClassTag[K] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
- implicit val valueCmt: ClassTag[V] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
-
- implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]]
- implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]]
-
- javaStreamingContext.ssc.kafkaStream[K, V, U, T](
- kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
- }
-}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaFunctions.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaFunctions.scala
deleted file mode 100644
index 2135634a69..0000000000
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaFunctions.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import scala.reflect.ClassTag
-
-import kafka.serializer.{Decoder, StringDecoder}
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming._
-
-/**
- * Extra Kafka input stream functions available on [[org.apache.spark.streaming.StreamingContext]]
- * through implicit conversion. Import org.apache.spark.streaming.kafka._ to use these functions.
- */
-class KafkaFunctions(ssc: StreamingContext) {
- /**
- * Create an input stream that pulls messages from a Kafka Broker.
- * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
- * @param groupId The group id for this consumer.
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- * @param storageLevel Storage level to use for storing the received objects
- * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
- */
- def kafkaStream(
- zkQuorum: String,
- groupId: String,
- topics: Map[String, Int],
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): DStream[(String, String)] = {
- val kafkaParams = Map[String, String](
- "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
- "zookeeper.connection.timeout.ms" -> "10000")
- kafkaStream[String, String, StringDecoder, StringDecoder](
- kafkaParams,
- topics,
- storageLevel)
- }
-
- /**
- * Create an input stream that pulls messages from a Kafka Broker.
- * @param kafkaParams Map of kafka configuration paramaters.
- * See: http://kafka.apache.org/configuration.html
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- * @param storageLevel Storage level to use for storing the received objects
- */
- def kafkaStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest](
- kafkaParams: Map[String, String],
- topics: Map[String, Int],
- storageLevel: StorageLevel
- ): DStream[(K, V)] = {
- val inputStream = new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel)
- ssc.registerInputStream(inputStream)
- inputStream
- }
-}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
new file mode 100644
index 0000000000..c2d851f943
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.reflect.ClassTag
+import scala.collection.JavaConversions._
+
+import java.lang.{Integer => JInt}
+import java.util.{Map => JMap}
+
+import kafka.serializer.{Decoder, StringDecoder}
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, DStream}
+import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream}
+
+
+object KafkaUtils {
+ /**
+ * Create an input stream that pulls messages from a Kafka Broker.
+ * @param ssc StreamingContext object
+ * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
+ * @param groupId The group id for this consumer
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread
+ * @param storageLevel Storage level to use for storing the received objects
+ * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
+ */
+ def createStream(
+ ssc: StreamingContext,
+ zkQuorum: String,
+ groupId: String,
+ topics: Map[String, Int],
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): DStream[(String, String)] = {
+ val kafkaParams = Map[String, String](
+ "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
+ "zookeeper.connection.timeout.ms" -> "10000")
+ createStream[String, String, StringDecoder, StringDecoder](
+ ssc, kafkaParams, topics, storageLevel)
+ }
+
+ /**
+ * Create an input stream that pulls messages from a Kafka Broker.
+ * @param ssc StreamingContext object
+ * @param kafkaParams Map of kafka configuration parameters,
+ * see http://kafka.apache.org/08/configuration.html
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest](
+ ssc: StreamingContext,
+ kafkaParams: Map[String, String],
+ topics: Map[String, Int],
+ storageLevel: StorageLevel
+ ): DStream[(K, V)] = {
+ val inputStream = new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel)
+ ssc.registerInputStream(inputStream)
+ inputStream
+ }
+
+ /**
+ * Create an input stream that pulls messages form a Kafka Broker.
+ * @param jssc JavaStreamingContext object
+ * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
+ * @param groupId The group id for this consumer
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ zkQuorum: String,
+ groupId: String,
+ topics: JMap[String, JInt]
+ ): JavaPairDStream[String, String] = {
+ implicit val cmt: ClassTag[String] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+ createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
+ }
+
+ /**
+ * Create an input stream that pulls messages form a Kafka Broker.
+ * @param jssc JavaStreamingContext object
+ * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..).
+ * @param groupId The group id for this consumer.
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param storageLevel RDD storage level.
+ *
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ zkQuorum: String,
+ groupId: String,
+ topics: JMap[String, JInt],
+ storageLevel: StorageLevel
+ ): JavaPairDStream[String, String] = {
+ implicit val cmt: ClassTag[String] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+ createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
+ }
+
+ /**
+ * Create an input stream that pulls messages form a Kafka Broker.
+ * @param jssc JavaStreamingContext object
+ * @param keyTypeClass Key type of RDD
+ * @param valueTypeClass value type of RDD
+ * @param keyDecoderClass Type of kafka key decoder
+ * @param valueDecoderClass Type of kafka value decoder
+ * @param kafkaParams Map of kafka configuration parameters,
+ * see http://kafka.apache.org/08/configuration.html
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread
+ * @param storageLevel RDD storage level. Defaults to MEMORY_AND_DISK_2.
+ */
+ def createStream[K, V, U <: Decoder[_], T <: Decoder[_]](
+ jssc: JavaStreamingContext,
+ keyTypeClass: Class[K],
+ valueTypeClass: Class[V],
+ keyDecoderClass: Class[U],
+ valueDecoderClass: Class[T],
+ kafkaParams: JMap[String, String],
+ topics: JMap[String, JInt],
+ storageLevel: StorageLevel
+ ): JavaPairDStream[K, V] = {
+ implicit val keyCmt: ClassTag[K] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
+ implicit val valueCmt: ClassTag[V] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
+
+ implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]]
+ implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]]
+
+ createStream[K, V, U, T](
+ jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
+ }
+}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala
deleted file mode 100644
index 44e7ce6e1b..0000000000
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-package object kafka {
- implicit def sscToKafkaFunctions(ssc: StreamingContext) = new KafkaFunctions(ssc)
-}
-
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index fdea96e506..7b4999447e 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -18,32 +18,27 @@
package org.apache.spark.streaming.kafka;
import java.util.HashMap;
-
-import org.apache.spark.streaming.api.java.kafka.KafkaFunctions;
import org.junit.Test;
import com.google.common.collect.Maps;
import kafka.serializer.StringDecoder;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.LocalJavaStreamingContext;
-import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
public class JavaKafkaStreamSuite extends LocalJavaStreamingContext {
@Test
public void testKafkaStream() {
-
HashMap<String, Integer> topics = Maps.newHashMap();
- KafkaFunctions kafkaFunc = new KafkaFunctions(ssc);
// tests the API, does not actually test data receiving
- JavaPairDStream<String, String> test1 = kafkaFunc.kafkaStream("localhost:12345", "group", topics);
- JavaPairDStream<String, String> test2 = kafkaFunc.kafkaStream("localhost:12345", "group", topics,
+ JavaPairDStream<String, String> test1 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics);
+ JavaPairDStream<String, String> test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics,
StorageLevel.MEMORY_AND_DISK_SER_2());
HashMap<String, String> kafkaParams = Maps.newHashMap();
kafkaParams.put("zookeeper.connect", "localhost:12345");
kafkaParams.put("group.id","consumer-group");
- JavaPairDStream<String, String> test3 = kafkaFunc.kafkaStream(
+ JavaPairDStream<String, String> test3 = KafkaUtils.createStream(ssc,
String.class, String.class, StringDecoder.class, StringDecoder.class,
kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2());
}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
index 2ef3e99c55..9c81f23c19 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -28,11 +28,11 @@ class KafkaStreamSuite extends TestSuiteBase {
val topics = Map("my-topic" -> 1)
// tests the API, does not actually test data receiving
- val test1 = ssc.kafkaStream("localhost:12345", "group", topics)
- val test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test1 = KafkaUtils.createStream(ssc, "localhost:1234", "group", topics)
+ val test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2)
val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group")
- val test3 = ssc.kafkaStream[String, String, StringDecoder, StringDecoder](
- kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test3 = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
+ ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2)
// TODO: Actually test receiving data
}