diff options
author | hyukjinkwon <gurwls223@gmail.com> | 2016-08-01 06:56:52 -0700 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-08-01 06:56:52 -0700 |
commit | f93ad4fe7c9728c8dd67a8095de3d39fad21d03f (patch) | |
tree | c144cbb09e56a114c0b51bf127c027cf1f6a02a9 /external/kafka-0-10 | |
parent | 1e9b59b73bdb8aacf5a85e0eed29efc6485a3bc3 (diff) | |
download | spark-f93ad4fe7c9728c8dd67a8095de3d39fad21d03f.tar.gz spark-f93ad4fe7c9728c8dd67a8095de3d39fad21d03f.tar.bz2 spark-f93ad4fe7c9728c8dd67a8095de3d39fad21d03f.zip |
[SPARK-16776][STREAMING] Replace deprecated API in KafkaTestUtils for 0.10.0.
## What changes were proposed in this pull request?
This PR replaces the old Kafka API to 0.10.0 ones in `KafkaTestUtils`.
The change include:
- `Producer` to `KafkaProducer`
- Change configurations to equalvant ones. (I referred [here](http://kafka.apache.org/documentation.html#producerconfigs) for 0.10.0 and [here](http://kafka.apache.org/082/documentation.html#producerconfigs
) for old, 0.8.2).
This PR will remove the build warning as below:
```scala
[WARNING] .../spark/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala:71: class Producer in package producer is deprecated: This class has been deprecated and will be removed in a future release. Please use org.apache.kafka.clients.producer.KafkaProducer instead.
[WARNING] private var producer: Producer[String, String] = _
[WARNING] ^
[WARNING] .../spark/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala:181: class Producer in package producer is deprecated: This class has been deprecated and will be removed in a future release. Please use org.apache.kafka.clients.producer.KafkaProducer instead.
[WARNING] producer = new Producer[String, String](new ProducerConfig(producerConfiguration))
[WARNING] ^
[WARNING] .../spark/streaming/kafka010/KafkaTestUtils.scala:181: class ProducerConfig in package producer is deprecated: This class has been deprecated and will be removed in a future release. Please use org.apache.kafka.clients.producer.ProducerConfig instead.
[WARNING] producer = new Producer[String, String](new ProducerConfig(producerConfiguration))
[WARNING] ^
[WARNING] .../spark/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala:182: class KeyedMessage in package producer is deprecated: This class has been deprecated and will be removed in a future release. Please use org.apache.kafka.clients.producer.ProducerRecord instead.
[WARNING] producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) }: _*)
[WARNING] ^
[WARNING] four warnings found
[WARNING] warning: [options] bootstrap class path not set in conjunction with -source 1.7
[WARNING] 1 warning
```
## How was this patch tested?
Existing tests that use `KafkaTestUtils` should cover this.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #14416 from HyukjinKwon/SPARK-16776.
Diffstat (limited to 'external/kafka-0-10')
-rw-r--r-- | external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala | 20 |
1 files changed, 12 insertions, 8 deletions
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala index 19192e4b95..ecabe1c365 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala @@ -30,10 +30,10 @@ import scala.util.control.NonFatal import kafka.admin.AdminUtils import kafka.api.Request -import kafka.producer.{KeyedMessage, Producer, ProducerConfig} -import kafka.serializer.StringEncoder import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.ZkUtils +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.apache.kafka.common.serialization.StringSerializer import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} import org.apache.spark.SparkConf @@ -68,7 +68,7 @@ private[kafka010] class KafkaTestUtils extends Logging { private var server: KafkaServer = _ // Kafka producer - private var producer: Producer[String, String] = _ + private var producer: KafkaProducer[String, String] = _ // Flag to test whether the system is correctly started private var zkReady = false @@ -178,8 +178,10 @@ private[kafka010] class KafkaTestUtils extends Logging { /** Send the array of messages to the Kafka broker */ def sendMessages(topic: String, messages: Array[String]): Unit = { - producer = new Producer[String, String](new ProducerConfig(producerConfiguration)) - producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) }: _*) + producer = new KafkaProducer[String, String](producerConfiguration) + messages.foreach { message => + producer.send(new ProducerRecord[String, String](topic, message)) + } producer.close() producer = null } @@ -198,10 +200,12 @@ private[kafka010] class KafkaTestUtils extends Logging { private def producerConfiguration: Properties = { val props = new Properties() - props.put("metadata.broker.list", brokerAddress) - props.put("serializer.class", classOf[StringEncoder].getName) + props.put("bootstrap.servers", brokerAddress) + props.put("value.serializer", classOf[StringSerializer].getName) + // Key serializer is required. + props.put("key.serializer", classOf[StringSerializer].getName) // wait for all in-sync replicas to ack sends - props.put("request.required.acks", "-1") + props.put("acks", "all") props } |