aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-10
diff options
context:
space:
mode:
Diffstat (limited to 'external/kafka-0-10')
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala20
1 files changed, 12 insertions, 8 deletions
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
index 19192e4b95..ecabe1c365 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
@@ -30,10 +30,10 @@ import scala.util.control.NonFatal
import kafka.admin.AdminUtils
import kafka.api.Request
-import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
-import kafka.serializer.StringEncoder
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.ZkUtils
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.serialization.StringSerializer
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
import org.apache.spark.SparkConf
@@ -68,7 +68,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
private var server: KafkaServer = _
// Kafka producer
- private var producer: Producer[String, String] = _
+ private var producer: KafkaProducer[String, String] = _
// Flag to test whether the system is correctly started
private var zkReady = false
@@ -178,8 +178,10 @@ private[kafka010] class KafkaTestUtils extends Logging {
/** Send the array of messages to the Kafka broker */
def sendMessages(topic: String, messages: Array[String]): Unit = {
- producer = new Producer[String, String](new ProducerConfig(producerConfiguration))
- producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) }: _*)
+ producer = new KafkaProducer[String, String](producerConfiguration)
+ messages.foreach { message =>
+ producer.send(new ProducerRecord[String, String](topic, message))
+ }
producer.close()
producer = null
}
@@ -198,10 +200,12 @@ private[kafka010] class KafkaTestUtils extends Logging {
private def producerConfiguration: Properties = {
val props = new Properties()
- props.put("metadata.broker.list", brokerAddress)
- props.put("serializer.class", classOf[StringEncoder].getName)
+ props.put("bootstrap.servers", brokerAddress)
+ props.put("value.serializer", classOf[StringSerializer].getName)
+ // Key serializer is required.
+ props.put("key.serializer", classOf[StringSerializer].getName)
// wait for all in-sync replicas to ack sends
- props.put("request.required.acks", "-1")
+ props.put("acks", "all")
props
}