aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
diff options
context:
space:
mode:
Diffstat (limited to 'external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala')
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala142
1 files changed, 142 insertions, 0 deletions
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
new file mode 100644
index 0000000000..3713bda41b
--- /dev/null
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.util.Properties
+
+import scala.collection.Map
+import scala.reflect.{classTag, ClassTag}
+
+import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
+import kafka.serializer.Decoder
+import kafka.utils.VerifiableProperties
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Input stream that pulls messages from a Kafka Broker.
+ *
+ * @param kafkaParams Map of kafka configuration parameters.
+ * See: http://kafka.apache.org/configuration.html
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param storageLevel RDD storage level.
+ */
+private[streaming]
+class KafkaInputDStream[
+ K: ClassTag,
+ V: ClassTag,
+ U <: Decoder[_]: ClassTag,
+ T <: Decoder[_]: ClassTag](
+ _ssc: StreamingContext,
+ kafkaParams: Map[String, String],
+ topics: Map[String, Int],
+ useReliableReceiver: Boolean,
+ storageLevel: StorageLevel
+ ) extends ReceiverInputDStream[(K, V)](_ssc) with Logging {
+
+ def getReceiver(): Receiver[(K, V)] = {
+ if (!useReliableReceiver) {
+ new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
+ } else {
+ new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
+ }
+ }
+}
+
+private[streaming]
+class KafkaReceiver[
+ K: ClassTag,
+ V: ClassTag,
+ U <: Decoder[_]: ClassTag,
+ T <: Decoder[_]: ClassTag](
+ kafkaParams: Map[String, String],
+ topics: Map[String, Int],
+ storageLevel: StorageLevel
+ ) extends Receiver[(K, V)](storageLevel) with Logging {
+
+ // Connection to Kafka
+ var consumerConnector: ConsumerConnector = null
+
+ def onStop() {
+ if (consumerConnector != null) {
+ consumerConnector.shutdown()
+ consumerConnector = null
+ }
+ }
+
+ def onStart() {
+
+ logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id"))
+
+ // Kafka connection properties
+ val props = new Properties()
+ kafkaParams.foreach(param => props.put(param._1, param._2))
+
+ val zkConnect = kafkaParams("zookeeper.connect")
+ // Create the connection to the cluster
+ logInfo("Connecting to Zookeeper: " + zkConnect)
+ val consumerConfig = new ConsumerConfig(props)
+ consumerConnector = Consumer.create(consumerConfig)
+ logInfo("Connected to " + zkConnect)
+
+ val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
+ .newInstance(consumerConfig.props)
+ .asInstanceOf[Decoder[K]]
+ val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
+ .newInstance(consumerConfig.props)
+ .asInstanceOf[Decoder[V]]
+
+ // Create threads for each topic/message Stream we are listening
+ val topicMessageStreams = consumerConnector.createMessageStreams(
+ topics, keyDecoder, valueDecoder)
+
+ val executorPool =
+ ThreadUtils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler")
+ try {
+ // Start the messages handler for each partition
+ topicMessageStreams.values.foreach { streams =>
+ streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
+ }
+ } finally {
+ executorPool.shutdown() // Just causes threads to terminate after work is done
+ }
+ }
+
+ // Handles Kafka messages
+ private class MessageHandler(stream: KafkaStream[K, V])
+ extends Runnable {
+ def run() {
+ logInfo("Starting MessageHandler.")
+ try {
+ val streamIterator = stream.iterator()
+ while (streamIterator.hasNext()) {
+ val msgAndMetadata = streamIterator.next()
+ store((msgAndMetadata.key, msgAndMetadata.message))
+ }
+ } catch {
+ case e: Throwable => reportError("Error handling message; exiting", e)
+ }
+ }
+ }
+}