diff options
Diffstat (limited to 'external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala')
-rw-r--r-- | external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala | 142 |
1 files changed, 142 insertions, 0 deletions
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala new file mode 100644 index 0000000000..3713bda41b --- /dev/null +++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.util.Properties + +import scala.collection.Map +import scala.reflect.{classTag, ClassTag} + +import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream} +import kafka.serializer.Decoder +import kafka.utils.VerifiableProperties + +import org.apache.spark.internal.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.receiver.Receiver +import org.apache.spark.util.ThreadUtils + +/** + * Input stream that pulls messages from a Kafka Broker. + * + * @param kafkaParams Map of kafka configuration parameters. + * See: http://kafka.apache.org/configuration.html + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param storageLevel RDD storage level. + */ +private[streaming] +class KafkaInputDStream[ + K: ClassTag, + V: ClassTag, + U <: Decoder[_]: ClassTag, + T <: Decoder[_]: ClassTag]( + _ssc: StreamingContext, + kafkaParams: Map[String, String], + topics: Map[String, Int], + useReliableReceiver: Boolean, + storageLevel: StorageLevel + ) extends ReceiverInputDStream[(K, V)](_ssc) with Logging { + + def getReceiver(): Receiver[(K, V)] = { + if (!useReliableReceiver) { + new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) + } else { + new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) + } + } +} + +private[streaming] +class KafkaReceiver[ + K: ClassTag, + V: ClassTag, + U <: Decoder[_]: ClassTag, + T <: Decoder[_]: ClassTag]( + kafkaParams: Map[String, String], + topics: Map[String, Int], + storageLevel: StorageLevel + ) extends Receiver[(K, V)](storageLevel) with Logging { + + // Connection to Kafka + var consumerConnector: ConsumerConnector = null + + def onStop() { + if (consumerConnector != null) { + consumerConnector.shutdown() + consumerConnector = null + } + } + + def onStart() { + + logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id")) + + // Kafka connection properties + val props = new Properties() + kafkaParams.foreach(param => props.put(param._1, param._2)) + + val zkConnect = kafkaParams("zookeeper.connect") + // Create the connection to the cluster + logInfo("Connecting to Zookeeper: " + zkConnect) + val consumerConfig = new ConsumerConfig(props) + consumerConnector = Consumer.create(consumerConfig) + logInfo("Connected to " + zkConnect) + + val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) + .newInstance(consumerConfig.props) + .asInstanceOf[Decoder[K]] + val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) + .newInstance(consumerConfig.props) + .asInstanceOf[Decoder[V]] + + // Create threads for each topic/message Stream we are listening + val topicMessageStreams = consumerConnector.createMessageStreams( + topics, keyDecoder, valueDecoder) + + val executorPool = + ThreadUtils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler") + try { + // Start the messages handler for each partition + topicMessageStreams.values.foreach { streams => + streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) } + } + } finally { + executorPool.shutdown() // Just causes threads to terminate after work is done + } + } + + // Handles Kafka messages + private class MessageHandler(stream: KafkaStream[K, V]) + extends Runnable { + def run() { + logInfo("Starting MessageHandler.") + try { + val streamIterator = stream.iterator() + while (streamIterator.hasNext()) { + val msgAndMetadata = streamIterator.next() + store((msgAndMetadata.key, msgAndMetadata.message)) + } + } catch { + case e: Throwable => reportError("Error handling message; exiting", e) + } + } + } +} |