From 316a5c0423ba3688cacd3acc3c5b5571e8a71d1d Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 6 May 2015 17:44:43 -0700 Subject: [SPARK-7396] [STREAMING] [EXAMPLE] Update KafkaWordCountProducer to use new Producer API Otherwise it will throw exception: ``` Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) at kafka.producer.Producer.send(Producer.scala:77) at org.apache.spark.examples.streaming.KafkaWordCountProducer$.main(KafkaWordCount.scala:96) at org.apache.spark.examples.streaming.KafkaWordCountProducer.main(KafkaWordCount.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:623) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) ``` Author: jerryshao Closes #5936 from jerryshao/SPARK-7396 and squashes the following commits: 270bbe2 [jerryshao] Fix Kafka Produce throw Exception issue --- .../spark/examples/streaming/KafkaWordCount.scala | 24 ++++++++++++---------- 1 file changed, 13 insertions(+), 11 deletions(-) (limited to 'examples') diff --git a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala index 387c0e4213..f407367a54 100644 --- a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala +++ b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala @@ -17,9 +17,9 @@ package org.apache.spark.examples.streaming -import java.util.Properties +import java.util.HashMap -import kafka.producer._ +import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord} import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ @@ -77,23 +77,25 @@ object KafkaWordCountProducer { val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args // Zookeeper connection properties - val props = new Properties() - props.put("metadata.broker.list", brokers) - props.put("serializer.class", "kafka.serializer.StringEncoder") + val props = new HashMap[String, Object]() + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringSerializer") - val config = new ProducerConfig(props) - val producer = new Producer[String, String](config) + val producer = new KafkaProducer[String, String](props) // Send some messages while(true) { - val messages = (1 to messagesPerSec.toInt).map { messageNum => + (1 to messagesPerSec.toInt).foreach { messageNum => val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString) .mkString(" ") - new KeyedMessage[String, String](topic, str) - }.toArray + val message = new ProducerRecord[String, String](topic, null, str) + producer.send(message) + } - producer.send(messages: _*) Thread.sleep(100) } } -- cgit v1.2.3