aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-10-sql/src
diff options
context:
space:
mode:
authorTyson Condie <tcondie@gmail.com>2017-03-06 16:39:05 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2017-03-06 16:39:05 -0800
commitb0a5cd89097c563e9949d8cfcf84d18b03b8d24c (patch)
tree39c0c52b58c7b38eb105d6791e1683cdbe0f271a /external/kafka-0-10-sql/src
parentf6471dc0d5db2d98e48f9f1ae1dba0f174ed9648 (diff)
downloadspark-b0a5cd89097c563e9949d8cfcf84d18b03b8d24c.tar.gz
spark-b0a5cd89097c563e9949d8cfcf84d18b03b8d24c.tar.bz2
spark-b0a5cd89097c563e9949d8cfcf84d18b03b8d24c.zip
[SPARK-19719][SS] Kafka writer for both structured streaming and batch queires
## What changes were proposed in this pull request? Add a new Kafka Sink and Kafka Relation for writing streaming and batch queries, respectively, to Apache Kafka. ### Streaming Kafka Sink - When addBatch is called -- If batchId is great than the last written batch --- Write batch to Kafka ---- Topic will be taken from the record, if present, or from a topic option, which overrides topic in record. -- Else ignore ### Batch Kafka Sink - KafkaSourceProvider will implement CreatableRelationProvider - CreatableRelationProvider#createRelation will write the passed in Dataframe to a Kafka - Topic will be taken from the record, if present, or from topic option, which overrides topic in record. - Save modes Append and ErrorIfExist supported under identical semantics. Other save modes result in an AnalysisException tdas zsxwing ## How was this patch tested? ### The following unit tests will be included - write to stream with topic field: valid stream write with data that includes an existing topic in the schema - write structured streaming aggregation w/o topic field, with default topic: valid stream write with data that does not include a topic field, but the configuration includes a default topic - write data with bad schema: various cases of writing data that does not conform to a proper schema e.g., 1. no topic field or default topic, and 2. no value field - write data with valid schema but wrong types: data with a complete schema but wrong types e.g., key and value types are integers. - write to non-existing topic: write a stream to a topic that does not exist in Kafka, which has been configured to not auto-create topics. - write batch to kafka: simple write batch to Kafka, which goes through the same code path as streaming scenario, so validity checks will not be redone here. ### Examples ```scala // Structured Streaming val writer = inputStringStream.map(s => s.get(0).toString.getBytes()).toDF("value") .selectExpr("value as key", "value as value") .writeStream .format("kafka") .option("checkpointLocation", checkpointDir) .outputMode(OutputMode.Append) .option("kafka.bootstrap.servers", brokerAddress) .option("topic", topic) .queryName("kafkaStream") .start() // Batch val df = spark .sparkContext .parallelize(Seq("1", "2", "3", "4", "5")) .map(v => (topic, v)) .toDF("topic", "value") df.write .format("kafka") .option("kafka.bootstrap.servers",brokerAddress) .option("topic", topic) .save() ``` Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Tyson Condie <tcondie@gmail.com> Closes #17043 from tcondie/kafka-writer.
Diffstat (limited to 'external/kafka-0-10-sql/src')
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala43
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala83
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala123
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala97
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala412
5 files changed, 753 insertions, 5 deletions
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala
new file mode 100644
index 0000000000..08914d82ff
--- /dev/null
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.execution.streaming.Sink
+
+private[kafka010] class KafkaSink(
+ sqlContext: SQLContext,
+ executorKafkaParams: ju.Map[String, Object],
+ topic: Option[String]) extends Sink with Logging {
+ @volatile private var latestBatchId = -1L
+
+ override def toString(): String = "KafkaSink"
+
+ override def addBatch(batchId: Long, data: DataFrame): Unit = {
+ if (batchId <= latestBatchId) {
+ logInfo(s"Skipping already committed batch $batchId")
+ } else {
+ KafkaWriter.write(sqlContext.sparkSession,
+ data.queryExecution, executorKafkaParams, topic)
+ latestBatchId = batchId
+ }
+ }
+}
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 6a74567198..febe3c2171 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -23,12 +23,14 @@ import java.util.UUID
import scala.collection.JavaConverters._
import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.execution.streaming.Source
+import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
+import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
/**
@@ -36,8 +38,12 @@ import org.apache.spark.sql.types.StructType
* IllegalArgumentException when the Kafka Dataset is created, so that it can catch
* missing options even before the query is started.
*/
-private[kafka010] class KafkaSourceProvider extends DataSourceRegister with StreamSourceProvider
- with RelationProvider with Logging {
+private[kafka010] class KafkaSourceProvider extends DataSourceRegister
+ with StreamSourceProvider
+ with StreamSinkProvider
+ with RelationProvider
+ with CreatableRelationProvider
+ with Logging {
import KafkaSourceProvider._
override def shortName(): String = "kafka"
@@ -152,6 +158,72 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister with Stre
endingRelationOffsets)
}
+ override def createSink(
+ sqlContext: SQLContext,
+ parameters: Map[String, String],
+ partitionColumns: Seq[String],
+ outputMode: OutputMode): Sink = {
+ val defaultTopic = parameters.get(TOPIC_OPTION_KEY).map(_.trim)
+ val specifiedKafkaParams = kafkaParamsForProducer(parameters)
+ new KafkaSink(sqlContext,
+ new ju.HashMap[String, Object](specifiedKafkaParams.asJava), defaultTopic)
+ }
+
+ override def createRelation(
+ outerSQLContext: SQLContext,
+ mode: SaveMode,
+ parameters: Map[String, String],
+ data: DataFrame): BaseRelation = {
+ mode match {
+ case SaveMode.Overwrite | SaveMode.Ignore =>
+ throw new AnalysisException(s"Save mode $mode not allowed for Kafka. " +
+ s"Allowed save modes are ${SaveMode.Append} and " +
+ s"${SaveMode.ErrorIfExists} (default).")
+ case _ => // good
+ }
+ val topic = parameters.get(TOPIC_OPTION_KEY).map(_.trim)
+ val specifiedKafkaParams = kafkaParamsForProducer(parameters)
+ KafkaWriter.write(outerSQLContext.sparkSession, data.queryExecution,
+ new ju.HashMap[String, Object](specifiedKafkaParams.asJava), topic)
+
+ /* This method is suppose to return a relation that reads the data that was written.
+ * We cannot support this for Kafka. Therefore, in order to make things consistent,
+ * we return an empty base relation.
+ */
+ new BaseRelation {
+ override def sqlContext: SQLContext = unsupportedException
+ override def schema: StructType = unsupportedException
+ override def needConversion: Boolean = unsupportedException
+ override def sizeInBytes: Long = unsupportedException
+ override def unhandledFilters(filters: Array[Filter]): Array[Filter] = unsupportedException
+ private def unsupportedException =
+ throw new UnsupportedOperationException("BaseRelation from Kafka write " +
+ "operation is not usable.")
+ }
+ }
+
+ private def kafkaParamsForProducer(parameters: Map[String, String]): Map[String, String] = {
+ val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase, v) }
+ if (caseInsensitiveParams.contains(s"kafka.${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}")) {
+ throw new IllegalArgumentException(
+ s"Kafka option '${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}' is not supported as keys "
+ + "are serialized with ByteArraySerializer.")
+ }
+
+ if (caseInsensitiveParams.contains(s"kafka.${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}"))
+ {
+ throw new IllegalArgumentException(
+ s"Kafka option '${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}' is not supported as "
+ + "value are serialized with ByteArraySerializer.")
+ }
+ parameters
+ .keySet
+ .filter(_.toLowerCase.startsWith("kafka."))
+ .map { k => k.drop(6).toString -> parameters(k) }
+ .toMap + (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName,
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName)
+ }
+
private def kafkaParamsForDriver(specifiedKafkaParams: Map[String, String]) =
ConfigUpdater("source", specifiedKafkaParams)
.set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
@@ -381,6 +453,7 @@ private[kafka010] object KafkaSourceProvider {
private val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
private val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
+ val TOPIC_OPTION_KEY = "topic"
private val deserClassName = classOf[ByteArrayDeserializer].getName
}
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
new file mode 100644
index 0000000000..6e160cbe2d
--- /dev/null
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+
+import org.apache.kafka.clients.producer.{KafkaProducer, _}
+import org.apache.kafka.common.serialization.ByteArraySerializer
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection}
+import org.apache.spark.sql.types.{BinaryType, StringType}
+
+/**
+ * A simple trait for writing out data in a single Spark task, without any concerns about how
+ * to commit or abort tasks. Exceptions thrown by the implementation of this class will
+ * automatically trigger task aborts.
+ */
+private[kafka010] class KafkaWriteTask(
+ producerConfiguration: ju.Map[String, Object],
+ inputSchema: Seq[Attribute],
+ topic: Option[String]) {
+ // used to synchronize with Kafka callbacks
+ @volatile private var failedWrite: Exception = null
+ private val projection = createProjection
+ private var producer: KafkaProducer[Array[Byte], Array[Byte]] = _
+
+ /**
+ * Writes key value data out to topics.
+ */
+ def execute(iterator: Iterator[InternalRow]): Unit = {
+ producer = new KafkaProducer[Array[Byte], Array[Byte]](producerConfiguration)
+ while (iterator.hasNext && failedWrite == null) {
+ val currentRow = iterator.next()
+ val projectedRow = projection(currentRow)
+ val topic = projectedRow.getUTF8String(0)
+ val key = projectedRow.getBinary(1)
+ val value = projectedRow.getBinary(2)
+ if (topic == null) {
+ throw new NullPointerException(s"null topic present in the data. Use the " +
+ s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default topic.")
+ }
+ val record = new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, key, value)
+ val callback = new Callback() {
+ override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
+ if (failedWrite == null && e != null) {
+ failedWrite = e
+ }
+ }
+ }
+ producer.send(record, callback)
+ }
+ }
+
+ def close(): Unit = {
+ if (producer != null) {
+ checkForErrors
+ producer.close()
+ checkForErrors
+ producer = null
+ }
+ }
+
+ private def createProjection: UnsafeProjection = {
+ val topicExpression = topic.map(Literal(_)).orElse {
+ inputSchema.find(_.name == KafkaWriter.TOPIC_ATTRIBUTE_NAME)
+ }.getOrElse {
+ throw new IllegalStateException(s"topic option required when no " +
+ s"'${KafkaWriter.TOPIC_ATTRIBUTE_NAME}' attribute is present")
+ }
+ topicExpression.dataType match {
+ case StringType => // good
+ case t =>
+ throw new IllegalStateException(s"${KafkaWriter.TOPIC_ATTRIBUTE_NAME} " +
+ s"attribute unsupported type $t. ${KafkaWriter.TOPIC_ATTRIBUTE_NAME} " +
+ s"must be a ${StringType}")
+ }
+ val keyExpression = inputSchema.find(_.name == KafkaWriter.KEY_ATTRIBUTE_NAME)
+ .getOrElse(Literal(null, BinaryType))
+ keyExpression.dataType match {
+ case StringType | BinaryType => // good
+ case t =>
+ throw new IllegalStateException(s"${KafkaWriter.KEY_ATTRIBUTE_NAME} " +
+ s"attribute unsupported type $t")
+ }
+ val valueExpression = inputSchema
+ .find(_.name == KafkaWriter.VALUE_ATTRIBUTE_NAME).getOrElse(
+ throw new IllegalStateException(s"Required attribute " +
+ s"'${KafkaWriter.VALUE_ATTRIBUTE_NAME}' not found")
+ )
+ valueExpression.dataType match {
+ case StringType | BinaryType => // good
+ case t =>
+ throw new IllegalStateException(s"${KafkaWriter.VALUE_ATTRIBUTE_NAME} " +
+ s"attribute unsupported type $t")
+ }
+ UnsafeProjection.create(
+ Seq(topicExpression, Cast(keyExpression, BinaryType),
+ Cast(valueExpression, BinaryType)), inputSchema)
+ }
+
+ private def checkForErrors: Unit = {
+ if (failedWrite != null) {
+ throw failedWrite
+ }
+ }
+}
+
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
new file mode 100644
index 0000000000..a637d52c93
--- /dev/null
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
+import org.apache.spark.sql.types.{BinaryType, StringType}
+import org.apache.spark.util.Utils
+
+/**
+ * The [[KafkaWriter]] class is used to write data from a batch query
+ * or structured streaming query, given by a [[QueryExecution]], to Kafka.
+ * The data is assumed to have a value column, and an optional topic and key
+ * columns. If the topic column is missing, then the topic must come from
+ * the 'topic' configuration option. If the key column is missing, then a
+ * null valued key field will be added to the
+ * [[org.apache.kafka.clients.producer.ProducerRecord]].
+ */
+private[kafka010] object KafkaWriter extends Logging {
+ val TOPIC_ATTRIBUTE_NAME: String = "topic"
+ val KEY_ATTRIBUTE_NAME: String = "key"
+ val VALUE_ATTRIBUTE_NAME: String = "value"
+
+ override def toString: String = "KafkaWriter"
+
+ def validateQuery(
+ queryExecution: QueryExecution,
+ kafkaParameters: ju.Map[String, Object],
+ topic: Option[String] = None): Unit = {
+ val schema = queryExecution.logical.output
+ schema.find(_.name == TOPIC_ATTRIBUTE_NAME).getOrElse(
+ if (topic == None) {
+ throw new AnalysisException(s"topic option required when no " +
+ s"'$TOPIC_ATTRIBUTE_NAME' attribute is present. Use the " +
+ s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a topic.")
+ } else {
+ Literal(topic.get, StringType)
+ }
+ ).dataType match {
+ case StringType => // good
+ case _ =>
+ throw new AnalysisException(s"Topic type must be a String")
+ }
+ schema.find(_.name == KEY_ATTRIBUTE_NAME).getOrElse(
+ Literal(null, StringType)
+ ).dataType match {
+ case StringType | BinaryType => // good
+ case _ =>
+ throw new AnalysisException(s"$KEY_ATTRIBUTE_NAME attribute type " +
+ s"must be a String or BinaryType")
+ }
+ schema.find(_.name == VALUE_ATTRIBUTE_NAME).getOrElse(
+ throw new AnalysisException(s"Required attribute '$VALUE_ATTRIBUTE_NAME' not found")
+ ).dataType match {
+ case StringType | BinaryType => // good
+ case _ =>
+ throw new AnalysisException(s"$VALUE_ATTRIBUTE_NAME attribute type " +
+ s"must be a String or BinaryType")
+ }
+ }
+
+ def write(
+ sparkSession: SparkSession,
+ queryExecution: QueryExecution,
+ kafkaParameters: ju.Map[String, Object],
+ topic: Option[String] = None): Unit = {
+ val schema = queryExecution.logical.output
+ validateQuery(queryExecution, kafkaParameters, topic)
+ SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
+ queryExecution.toRdd.foreachPartition { iter =>
+ val writeTask = new KafkaWriteTask(kafkaParameters, schema, topic)
+ Utils.tryWithSafeFinally(block = writeTask.execute(iter))(
+ finallyBlock = writeTask.close())
+ }
+ }
+ }
+}
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
new file mode 100644
index 0000000000..490535623c
--- /dev/null
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SpecificInternalRow, UnsafeProjection}
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.streaming._
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{BinaryType, DataType}
+
+class KafkaSinkSuite extends StreamTest with SharedSQLContext {
+ import testImplicits._
+
+ protected var testUtils: KafkaTestUtils = _
+
+ override val streamingTimeout = 30.seconds
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ testUtils = new KafkaTestUtils(
+ withBrokerProps = Map("auto.create.topics.enable" -> "false"))
+ testUtils.setup()
+ }
+
+ override def afterAll(): Unit = {
+ if (testUtils != null) {
+ testUtils.teardown()
+ testUtils = null
+ super.afterAll()
+ }
+ }
+
+ test("batch - write to kafka") {
+ val topic = newTopic()
+ testUtils.createTopic(topic)
+ val df = Seq("1", "2", "3", "4", "5").map(v => (topic, v)).toDF("topic", "value")
+ df.write
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("topic", topic)
+ .save()
+ checkAnswer(
+ createKafkaReader(topic).selectExpr("CAST(value as STRING) value"),
+ Row("1") :: Row("2") :: Row("3") :: Row("4") :: Row("5") :: Nil)
+ }
+
+ test("batch - null topic field value, and no topic option") {
+ val df = Seq[(String, String)](null.asInstanceOf[String] -> "1").toDF("topic", "value")
+ val ex = intercept[SparkException] {
+ df.write
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .save()
+ }
+ assert(ex.getMessage.toLowerCase.contains(
+ "null topic present in the data"))
+ }
+
+ test("batch - unsupported save modes") {
+ val topic = newTopic()
+ testUtils.createTopic(topic)
+ val df = Seq[(String, String)](null.asInstanceOf[String] -> "1").toDF("topic", "value")
+
+ // Test bad save mode Ignore
+ var ex = intercept[AnalysisException] {
+ df.write
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .mode(SaveMode.Ignore)
+ .save()
+ }
+ assert(ex.getMessage.toLowerCase.contains(
+ s"save mode ignore not allowed for kafka"))
+
+ // Test bad save mode Overwrite
+ ex = intercept[AnalysisException] {
+ df.write
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .mode(SaveMode.Overwrite)
+ .save()
+ }
+ assert(ex.getMessage.toLowerCase.contains(
+ s"save mode overwrite not allowed for kafka"))
+ }
+
+ test("streaming - write to kafka with topic field") {
+ val input = MemoryStream[String]
+ val topic = newTopic()
+ testUtils.createTopic(topic)
+
+ val writer = createKafkaWriter(
+ input.toDF(),
+ withTopic = None,
+ withOutputMode = Some(OutputMode.Append))(
+ withSelectExpr = s"'$topic' as topic", "value")
+
+ val reader = createKafkaReader(topic)
+ .selectExpr("CAST(key as STRING) key", "CAST(value as STRING) value")
+ .selectExpr("CAST(key as INT) key", "CAST(value as INT) value")
+ .as[(Int, Int)]
+ .map(_._2)
+
+ try {
+ input.addData("1", "2", "3", "4", "5")
+ failAfter(streamingTimeout) {
+ writer.processAllAvailable()
+ }
+ checkDatasetUnorderly(reader, 1, 2, 3, 4, 5)
+ input.addData("6", "7", "8", "9", "10")
+ failAfter(streamingTimeout) {
+ writer.processAllAvailable()
+ }
+ checkDatasetUnorderly(reader, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
+ } finally {
+ writer.stop()
+ }
+ }
+
+ test("streaming - write aggregation w/o topic field, with topic option") {
+ val input = MemoryStream[String]
+ val topic = newTopic()
+ testUtils.createTopic(topic)
+
+ val writer = createKafkaWriter(
+ input.toDF().groupBy("value").count(),
+ withTopic = Some(topic),
+ withOutputMode = Some(OutputMode.Update()))(
+ withSelectExpr = "CAST(value as STRING) key", "CAST(count as STRING) value")
+
+ val reader = createKafkaReader(topic)
+ .selectExpr("CAST(key as STRING) key", "CAST(value as STRING) value")
+ .selectExpr("CAST(key as INT) key", "CAST(value as INT) value")
+ .as[(Int, Int)]
+
+ try {
+ input.addData("1", "2", "2", "3", "3", "3")
+ failAfter(streamingTimeout) {
+ writer.processAllAvailable()
+ }
+ checkDatasetUnorderly(reader, (1, 1), (2, 2), (3, 3))
+ input.addData("1", "2", "3")
+ failAfter(streamingTimeout) {
+ writer.processAllAvailable()
+ }
+ checkDatasetUnorderly(reader, (1, 1), (2, 2), (3, 3), (1, 2), (2, 3), (3, 4))
+ } finally {
+ writer.stop()
+ }
+ }
+
+ test("streaming - aggregation with topic field and topic option") {
+ /* The purpose of this test is to ensure that the topic option
+ * overrides the topic field. We begin by writing some data that
+ * includes a topic field and value (e.g., 'foo') along with a topic
+ * option. Then when we read from the topic specified in the option
+ * we should see the data i.e., the data was written to the topic
+ * option, and not to the topic in the data e.g., foo
+ */
+ val input = MemoryStream[String]
+ val topic = newTopic()
+ testUtils.createTopic(topic)
+
+ val writer = createKafkaWriter(
+ input.toDF().groupBy("value").count(),
+ withTopic = Some(topic),
+ withOutputMode = Some(OutputMode.Update()))(
+ withSelectExpr = "'foo' as topic",
+ "CAST(value as STRING) key", "CAST(count as STRING) value")
+
+ val reader = createKafkaReader(topic)
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .selectExpr("CAST(key AS INT)", "CAST(value AS INT)")
+ .as[(Int, Int)]
+
+ try {
+ input.addData("1", "2", "2", "3", "3", "3")
+ failAfter(streamingTimeout) {
+ writer.processAllAvailable()
+ }
+ checkDatasetUnorderly(reader, (1, 1), (2, 2), (3, 3))
+ input.addData("1", "2", "3")
+ failAfter(streamingTimeout) {
+ writer.processAllAvailable()
+ }
+ checkDatasetUnorderly(reader, (1, 1), (2, 2), (3, 3), (1, 2), (2, 3), (3, 4))
+ } finally {
+ writer.stop()
+ }
+ }
+
+
+ test("streaming - write data with bad schema") {
+ val input = MemoryStream[String]
+ val topic = newTopic()
+ testUtils.createTopic(topic)
+
+ /* No topic field or topic option */
+ var writer: StreamingQuery = null
+ var ex: Exception = null
+ try {
+ ex = intercept[StreamingQueryException] {
+ writer = createKafkaWriter(input.toDF())(
+ withSelectExpr = "value as key", "value"
+ )
+ input.addData("1", "2", "3", "4", "5")
+ writer.processAllAvailable()
+ }
+ } finally {
+ writer.stop()
+ }
+ assert(ex.getMessage
+ .toLowerCase
+ .contains("topic option required when no 'topic' attribute is present"))
+
+ try {
+ /* No value field */
+ ex = intercept[StreamingQueryException] {
+ writer = createKafkaWriter(input.toDF())(
+ withSelectExpr = s"'$topic' as topic", "value as key"
+ )
+ input.addData("1", "2", "3", "4", "5")
+ writer.processAllAvailable()
+ }
+ } finally {
+ writer.stop()
+ }
+ assert(ex.getMessage.toLowerCase.contains("required attribute 'value' not found"))
+ }
+
+ test("streaming - write data with valid schema but wrong types") {
+ val input = MemoryStream[String]
+ val topic = newTopic()
+ testUtils.createTopic(topic)
+
+ var writer: StreamingQuery = null
+ var ex: Exception = null
+ try {
+ /* topic field wrong type */
+ ex = intercept[StreamingQueryException] {
+ writer = createKafkaWriter(input.toDF())(
+ withSelectExpr = s"CAST('1' as INT) as topic", "value"
+ )
+ input.addData("1", "2", "3", "4", "5")
+ writer.processAllAvailable()
+ }
+ } finally {
+ writer.stop()
+ }
+ assert(ex.getMessage.toLowerCase.contains("topic type must be a string"))
+
+ try {
+ /* value field wrong type */
+ ex = intercept[StreamingQueryException] {
+ writer = createKafkaWriter(input.toDF())(
+ withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as value"
+ )
+ input.addData("1", "2", "3", "4", "5")
+ writer.processAllAvailable()
+ }
+ } finally {
+ writer.stop()
+ }
+ assert(ex.getMessage.toLowerCase.contains(
+ "value attribute type must be a string or binarytype"))
+
+ try {
+ ex = intercept[StreamingQueryException] {
+ /* key field wrong type */
+ writer = createKafkaWriter(input.toDF())(
+ withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as key", "value"
+ )
+ input.addData("1", "2", "3", "4", "5")
+ writer.processAllAvailable()
+ }
+ } finally {
+ writer.stop()
+ }
+ assert(ex.getMessage.toLowerCase.contains(
+ "key attribute type must be a string or binarytype"))
+ }
+
+ test("streaming - write to non-existing topic") {
+ val input = MemoryStream[String]
+ val topic = newTopic()
+
+ var writer: StreamingQuery = null
+ var ex: Exception = null
+ try {
+ ex = intercept[StreamingQueryException] {
+ writer = createKafkaWriter(input.toDF(), withTopic = Some(topic))()
+ input.addData("1", "2", "3", "4", "5")
+ writer.processAllAvailable()
+ }
+ } finally {
+ writer.stop()
+ }
+ assert(ex.getMessage.toLowerCase.contains("job aborted"))
+ }
+
+ test("streaming - exception on config serializer") {
+ val input = MemoryStream[String]
+ var writer: StreamingQuery = null
+ var ex: Exception = null
+ ex = intercept[IllegalArgumentException] {
+ writer = createKafkaWriter(
+ input.toDF(),
+ withOptions = Map("kafka.key.serializer" -> "foo"))()
+ }
+ assert(ex.getMessage.toLowerCase.contains(
+ "kafka option 'key.serializer' is not supported"))
+
+ ex = intercept[IllegalArgumentException] {
+ writer = createKafkaWriter(
+ input.toDF(),
+ withOptions = Map("kafka.value.serializer" -> "foo"))()
+ }
+ assert(ex.getMessage.toLowerCase.contains(
+ "kafka option 'value.serializer' is not supported"))
+ }
+
+ test("generic - write big data with small producer buffer") {
+ /* This test ensures that we understand the semantics of Kafka when
+ * is comes to blocking on a call to send when the send buffer is full.
+ * This test will configure the smallest possible producer buffer and
+ * indicate that we should block when it is full. Thus, no exception should
+ * be thrown in the case of a full buffer.
+ */
+ val topic = newTopic()
+ testUtils.createTopic(topic, 1)
+ val options = new java.util.HashMap[String, Object]
+ options.put("bootstrap.servers", testUtils.brokerAddress)
+ options.put("buffer.memory", "16384") // min buffer size
+ options.put("block.on.buffer.full", "true")
+ options.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName)
+ options.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName)
+ val inputSchema = Seq(AttributeReference("value", BinaryType)())
+ val data = new Array[Byte](15000) // large value
+ val writeTask = new KafkaWriteTask(options, inputSchema, Some(topic))
+ try {
+ val fieldTypes: Array[DataType] = Array(BinaryType)
+ val converter = UnsafeProjection.create(fieldTypes)
+ val row = new SpecificInternalRow(fieldTypes)
+ row.update(0, data)
+ val iter = Seq.fill(1000)(converter.apply(row)).iterator
+ writeTask.execute(iter)
+ } finally {
+ writeTask.close()
+ }
+ }
+
+ private val topicId = new AtomicInteger(0)
+
+ private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
+
+ private def createKafkaReader(topic: String): DataFrame = {
+ spark.read
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("startingOffsets", "earliest")
+ .option("endingOffsets", "latest")
+ .option("subscribe", topic)
+ .load()
+ }
+
+ private def createKafkaWriter(
+ input: DataFrame,
+ withTopic: Option[String] = None,
+ withOutputMode: Option[OutputMode] = None,
+ withOptions: Map[String, String] = Map[String, String]())
+ (withSelectExpr: String*): StreamingQuery = {
+ var stream: DataStreamWriter[Row] = null
+ withTempDir { checkpointDir =>
+ var df = input.toDF()
+ if (withSelectExpr.length > 0) {
+ df = df.selectExpr(withSelectExpr: _*)
+ }
+ stream = df.writeStream
+ .format("kafka")
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .queryName("kafkaStream")
+ withTopic.foreach(stream.option("topic", _))
+ withOutputMode.foreach(stream.outputMode(_))
+ withOptions.foreach(opt => stream.option(opt._1, opt._2))
+ }
+ stream.start()
+ }
+}