aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
Diffstat (limited to 'examples')
-rw-r--r--examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala24
1 files changed, 13 insertions, 11 deletions
diff --git a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
index 387c0e4213..f407367a54 100644
--- a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
+++ b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
@@ -17,9 +17,9 @@
package org.apache.spark.examples.streaming
-import java.util.Properties
+import java.util.HashMap
-import kafka.producer._
+import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord}
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
@@ -77,23 +77,25 @@ object KafkaWordCountProducer {
val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
// Zookeeper connection properties
- val props = new Properties()
- props.put("metadata.broker.list", brokers)
- props.put("serializer.class", "kafka.serializer.StringEncoder")
+ val props = new HashMap[String, Object]()
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringSerializer")
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringSerializer")
- val config = new ProducerConfig(props)
- val producer = new Producer[String, String](config)
+ val producer = new KafkaProducer[String, String](props)
// Send some messages
while(true) {
- val messages = (1 to messagesPerSec.toInt).map { messageNum =>
+ (1 to messagesPerSec.toInt).foreach { messageNum =>
val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
.mkString(" ")
- new KeyedMessage[String, String](topic, str)
- }.toArray
+ val message = new ProducerRecord[String, String](topic, null, str)
+ producer.send(message)
+ }
- producer.send(messages: _*)
Thread.sleep(100)
}
}