diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-30 11:13:24 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-30 11:13:24 -0800 |
commit | f4e40661912af2a23e250a49f72f00675172e2de (patch) | |
tree | 97d40d041b08bf8a320f908e7b241cce9432c014 /external/kafka | |
parent | 6e43039614ed1ec55a134fb82fb3e8d4e80996ef (diff) | |
download | spark-f4e40661912af2a23e250a49f72f00675172e2de.tar.gz spark-f4e40661912af2a23e250a49f72f00675172e2de.tar.bz2 spark-f4e40661912af2a23e250a49f72f00675172e2de.zip |
Refactored kafka, flume, zeromq, mqtt as separate external projects, with their own self-contained scala API, java API, scala unit tests and java unit tests. Updated examples to use the external projects.
Diffstat (limited to 'external/kafka')
7 files changed, 477 insertions, 0 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/JavaStreamingContextWithKafka.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/JavaStreamingContextWithKafka.scala new file mode 100644 index 0000000000..ab0e8a6c8d --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/JavaStreamingContextWithKafka.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.reflect.ClassTag +import scala.collection.JavaConversions._ + +import java.lang.{Integer => JInt} +import java.util.{Map => JMap} + +import kafka.serializer.Decoder + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream} + +/** + * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra + * functions for creating Kafka input streams. + */ +class JavaStreamingContextWithKafka(javaStreamingContext: JavaStreamingContext) + extends JavaStreamingContext(javaStreamingContext.ssc) { + + /** + * Create an input stream that pulls messages form a Kafka Broker. + * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + */ + def kafkaStream( + zkQuorum: String, + groupId: String, + topics: JMap[String, JInt] + ): JavaPairDStream[String, String] = { + implicit val cmt: ClassTag[String] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*)) + } + + /** + * Create an input stream that pulls messages form a Kafka Broker. + * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param storageLevel RDD storage level. + * + */ + def kafkaStream( + zkQuorum: String, + groupId: String, + topics: JMap[String, JInt], + storageLevel: StorageLevel + ): JavaPairDStream[String, String] = { + implicit val cmt: ClassTag[String] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) + } + + /** + * Create an input stream that pulls messages form a Kafka Broker. + * @param keyTypeClass Key type of RDD + * @param valueTypeClass value type of RDD + * @param keyDecoderClass Type of kafka key decoder + * @param valueDecoderClass Type of kafka value decoder + * @param kafkaParams Map of kafka configuration paramaters. + * See: http://kafka.apache.org/configuration.html + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param storageLevel RDD storage level. Defaults to memory-only + */ + def kafkaStream[K, V, U <: Decoder[_], T <: Decoder[_]]( + keyTypeClass: Class[K], + valueTypeClass: Class[V], + keyDecoderClass: Class[U], + valueDecoderClass: Class[T], + kafkaParams: JMap[String, String], + topics: JMap[String, JInt], + storageLevel: StorageLevel + ): JavaPairDStream[K, V] = { + implicit val keyCmt: ClassTag[K] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] + implicit val valueCmt: ClassTag[V] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] + + implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]] + implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]] + + ssc.kafkaStream[K, V, U, T]( + kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) + } +} diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaFunctions.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaFunctions.scala new file mode 100644 index 0000000000..2135634a69 --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaFunctions.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.reflect.ClassTag + +import kafka.serializer.{Decoder, StringDecoder} + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming._ + +/** + * Extra Kafka input stream functions available on [[org.apache.spark.streaming.StreamingContext]] + * through implicit conversion. Import org.apache.spark.streaming.kafka._ to use these functions. + */ +class KafkaFunctions(ssc: StreamingContext) { + /** + * Create an input stream that pulls messages from a Kafka Broker. + * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param storageLevel Storage level to use for storing the received objects + * (default: StorageLevel.MEMORY_AND_DISK_SER_2) + */ + def kafkaStream( + zkQuorum: String, + groupId: String, + topics: Map[String, Int], + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[(String, String)] = { + val kafkaParams = Map[String, String]( + "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, + "zookeeper.connection.timeout.ms" -> "10000") + kafkaStream[String, String, StringDecoder, StringDecoder]( + kafkaParams, + topics, + storageLevel) + } + + /** + * Create an input stream that pulls messages from a Kafka Broker. + * @param kafkaParams Map of kafka configuration paramaters. + * See: http://kafka.apache.org/configuration.html + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param storageLevel Storage level to use for storing the received objects + */ + def kafkaStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest]( + kafkaParams: Map[String, String], + topics: Map[String, Int], + storageLevel: StorageLevel + ): DStream[(K, V)] = { + val inputStream = new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel) + ssc.registerInputStream(inputStream) + inputStream + } +} diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala new file mode 100644 index 0000000000..fd69328aba --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.collection.Map +import scala.reflect.ClassTag + +import java.util.Properties +import java.util.concurrent.Executors + +import kafka.consumer._ +import kafka.serializer.Decoder +import kafka.utils.VerifiableProperties +import kafka.utils.ZKStringSerializer +import org.I0Itec.zkclient._ + +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream._ + + +/** + * Input stream that pulls messages from a Kafka Broker. + * + * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param storageLevel RDD storage level. + */ +private[streaming] +class KafkaInputDStream[ + K: ClassTag, + V: ClassTag, + U <: Decoder[_]: Manifest, + T <: Decoder[_]: Manifest]( + @transient ssc_ : StreamingContext, + kafkaParams: Map[String, String], + topics: Map[String, Int], + storageLevel: StorageLevel + ) extends NetworkInputDStream[(K, V)](ssc_) with Logging { + + def getReceiver(): NetworkReceiver[(K, V)] = { + new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) + .asInstanceOf[NetworkReceiver[(K, V)]] + } +} + +private[streaming] +class KafkaReceiver[ + K: ClassTag, + V: ClassTag, + U <: Decoder[_]: Manifest, + T <: Decoder[_]: Manifest]( + kafkaParams: Map[String, String], + topics: Map[String, Int], + storageLevel: StorageLevel + ) extends NetworkReceiver[Any] { + + // Handles pushing data into the BlockManager + lazy protected val blockGenerator = new BlockGenerator(storageLevel) + // Connection to Kafka + var consumerConnector : ConsumerConnector = null + + def onStop() { + blockGenerator.stop() + } + + def onStart() { + + blockGenerator.start() + + // In case we are using multiple Threads to handle Kafka Messages + val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _)) + + logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id")) + + // Kafka connection properties + val props = new Properties() + kafkaParams.foreach(param => props.put(param._1, param._2)) + + // Create the connection to the cluster + logInfo("Connecting to Zookeper: " + kafkaParams("zookeeper.connect")) + val consumerConfig = new ConsumerConfig(props) + consumerConnector = Consumer.create(consumerConfig) + logInfo("Connected to " + kafkaParams("zookeeper.connect")) + + // When autooffset.reset is defined, it is our responsibility to try and whack the + // consumer group zk node. + if (kafkaParams.contains("auto.offset.reset")) { + tryZookeeperConsumerGroupCleanup(kafkaParams("zookeeper.connect"), kafkaParams("group.id")) + } + + val keyDecoder = manifest[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) + .newInstance(consumerConfig.props) + .asInstanceOf[Decoder[K]] + val valueDecoder = manifest[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) + .newInstance(consumerConfig.props) + .asInstanceOf[Decoder[V]] + + // Create Threads for each Topic/Message Stream we are listening + val topicMessageStreams = consumerConnector.createMessageStreams( + topics, keyDecoder, valueDecoder) + + + // Start the messages handler for each partition + topicMessageStreams.values.foreach { streams => + streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) } + } + } + + // Handles Kafka Messages + private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V]) + extends Runnable { + def run() { + logInfo("Starting MessageHandler.") + for (msgAndMetadata <- stream) { + blockGenerator += (msgAndMetadata.key, msgAndMetadata.message) + } + } + } + + // It is our responsibility to delete the consumer group when specifying autooffset.reset. This is because + // Kafka 0.7.2 only honors this param when the group is not in zookeeper. + // + // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied from Kafkas' + // ConsoleConsumer. See code related to 'autooffset.reset' when it is set to 'smallest'/'largest': + // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala + private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) { + try { + val dir = "/consumers/" + groupId + logInfo("Cleaning up temporary zookeeper data under " + dir + ".") + val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) + zk.deleteRecursive(dir) + zk.close() + } catch { + case _ : Throwable => // swallow + } + } +} diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala new file mode 100644 index 0000000000..44e7ce6e1b --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +package object kafka { + implicit def sscToKafkaFunctions(ssc: StreamingContext) = new KafkaFunctions(ssc) +} + diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java new file mode 100644 index 0000000000..66236df662 --- /dev/null +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka; + +import java.util.HashMap; +import org.junit.Test; +import com.google.common.collect.Maps; +import kafka.serializer.StringDecoder; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.LocalJavaStreamingContext; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; + +public class JavaKafkaStreamSuite extends LocalJavaStreamingContext { + @Test + public void testKafkaStream() { + + HashMap<String, Integer> topics = Maps.newHashMap(); + JavaStreamingContextWithKafka sscWithKafka = new JavaStreamingContextWithKafka(ssc); + + // tests the API, does not actually test data receiving + JavaPairDStream<String, String> test1 = sscWithKafka.kafkaStream("localhost:12345", "group", topics); + JavaPairDStream<String, String> test2 = sscWithKafka.kafkaStream("localhost:12345", "group", topics, + StorageLevel.MEMORY_AND_DISK_SER_2()); + + HashMap<String, String> kafkaParams = Maps.newHashMap(); + kafkaParams.put("zookeeper.connect","localhost:12345"); + kafkaParams.put("group.id","consumer-group"); + JavaPairDStream<String, String> test3 = sscWithKafka.kafkaStream( + String.class, String.class, StringDecoder.class, StringDecoder.class, + kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2()); + + // To verify that JavaStreamingContextWithKafka is also StreamingContext + JavaDStream<String> socketStream = sscWithKafka.socketTextStream("localhost", 9999); + } +} diff --git a/external/kafka/src/test/resources/log4j.properties b/external/kafka/src/test/resources/log4j.properties new file mode 100644 index 0000000000..063529a9cb --- /dev/null +++ b/external/kafka/src/test/resources/log4j.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file streaming/target/unit-tests.log +log4j.rootCategory=INFO, file +# log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=false +log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.eclipse.jetty=WARN + diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala new file mode 100644 index 0000000000..2ef3e99c55 --- /dev/null +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import kafka.serializer.StringDecoder +import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} +import org.apache.spark.storage.StorageLevel + +class KafkaStreamSuite extends TestSuiteBase { + + test("kafka input stream") { + val ssc = new StreamingContext(master, framework, batchDuration) + val topics = Map("my-topic" -> 1) + + // tests the API, does not actually test data receiving + val test1 = ssc.kafkaStream("localhost:12345", "group", topics) + val test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2) + val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group") + val test3 = ssc.kafkaStream[String, String, StringDecoder, StringDecoder]( + kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2) + + // TODO: Actually test receiving data + } +} |