aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorseanm <sean.mcnamara@webtrends.com>2013-01-18 21:34:29 -0700
committerseanm <sean.mcnamara@webtrends.com>2013-01-18 21:34:29 -0700
commitd3064fe70762cbfcb7dbd5e1fbd708539c3de5e9 (patch)
tree461851d493577204c456c0e19ddddffb36c0fe7f /streaming/src
parent56b7fbafa2b7717896c613e39ecc134f2405b4c6 (diff)
downloadspark-d3064fe70762cbfcb7dbd5e1fbd708539c3de5e9.tar.gz
spark-d3064fe70762cbfcb7dbd5e1fbd708539c3de5e9.tar.bz2
spark-d3064fe70762cbfcb7dbd5e1fbd708539c3de5e9.zip
kafkaStream API cleanup. A quorum of zookeepers can now be specified
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala8
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala17
2 files changed, 10 insertions, 15 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 14500bdcb1..06cf7a06ed 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -136,8 +136,7 @@ class StreamingContext private (
/**
* Create an input stream that pulls messages form a Kafka Broker.
- * @param hostname Zookeper hostname.
- * @param port Zookeper port.
+ * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
@@ -146,14 +145,13 @@ class StreamingContext private (
* @param storageLevel RDD storage level. Defaults to memory-only.
*/
def kafkaStream[T: ClassManifest](
- hostname: String,
- port: Int,
+ zkQuorum: String,
groupId: String,
topics: Map[String, Int],
initialOffsets: Map[KafkaPartitionKey, Long] = Map[KafkaPartitionKey, Long](),
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2
): DStream[T] = {
- val inputStream = new KafkaInputDStream[T](this, hostname, port, groupId, topics, initialOffsets, storageLevel)
+ val inputStream = new KafkaInputDStream[T](this, zkQuorum, groupId, topics, initialOffsets, storageLevel)
registerInputStream(inputStream)
inputStream
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
index 533c91ee95..4f8c8b9d10 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
@@ -23,8 +23,7 @@ case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, part
/**
* Input stream that pulls messages from a Kafka Broker.
*
- * @param host Zookeper hostname.
- * @param port Zookeper port.
+ * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
@@ -35,8 +34,7 @@ case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, part
private[streaming]
class KafkaInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
- host: String,
- port: Int,
+ zkQuorum: String,
groupId: String,
topics: Map[String, Int],
initialOffsets: Map[KafkaPartitionKey, Long],
@@ -44,13 +42,13 @@ class KafkaInputDStream[T: ClassManifest](
) extends NetworkInputDStream[T](ssc_ ) with Logging {
def createReceiver(): NetworkReceiver[T] = {
- new KafkaReceiver(host, port, groupId, topics, initialOffsets, storageLevel)
+ new KafkaReceiver(zkQuorum, groupId, topics, initialOffsets, storageLevel)
.asInstanceOf[NetworkReceiver[T]]
}
}
private[streaming]
-class KafkaReceiver(host: String, port: Int, groupId: String,
+class KafkaReceiver(zkQuorum: String, groupId: String,
topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long],
storageLevel: StorageLevel) extends NetworkReceiver[Any] {
@@ -73,21 +71,20 @@ class KafkaReceiver(host: String, port: Int, groupId: String,
// In case we are using multiple Threads to handle Kafka Messages
val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _))
- val zooKeeperEndPoint = host + ":" + port
logInfo("Starting Kafka Consumer Stream with group: " + groupId)
logInfo("Initial offsets: " + initialOffsets.toString)
// Zookeper connection properties
val props = new Properties()
- props.put("zk.connect", zooKeeperEndPoint)
+ props.put("zk.connect", zkQuorum)
props.put("zk.connectiontimeout.ms", ZK_TIMEOUT.toString)
props.put("groupid", groupId)
// Create the connection to the cluster
- logInfo("Connecting to Zookeper: " + zooKeeperEndPoint)
+ logInfo("Connecting to Zookeper: " + zkQuorum)
val consumerConfig = new ConsumerConfig(props)
consumerConnector = Consumer.create(consumerConfig).asInstanceOf[ZookeeperConsumerConnector]
- logInfo("Connected to " + zooKeeperEndPoint)
+ logInfo("Connected to " + zkQuorum)
// If specified, set the topic offset
setOffsets(initialOffsets)