From c2d1761a57f5d175913284533b3d0417e8718688 Mon Sep 17 00:00:00 2001 From: Tyson Condie Date: Mon, 20 Mar 2017 17:18:59 -0700 Subject: [SPARK-19906][SS][DOCS] Documentation describing how to write queries to Kafka ## What changes were proposed in this pull request? Add documentation that describes how to write streaming and batch queries to Kafka. zsxwing tdas Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Tyson Condie Closes #17246 from tcondie/kafka-write-docs. --- docs/structured-streaming-kafka-integration.md | 321 ++++++++++++++++++++----- 1 file changed, 264 insertions(+), 57 deletions(-) (limited to 'docs/structured-streaming-kafka-integration.md') diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index 522e669568..217c1a91a1 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -3,9 +3,9 @@ layout: global title: Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) --- -Structured Streaming integration for Kafka 0.10 to poll data from Kafka. +Structured Streaming integration for Kafka 0.10 to read data from and write data to Kafka. -### Linking +## Linking For Scala/Java applications using SBT/Maven project definitions, link your application with the following artifact: groupId = org.apache.spark @@ -15,40 +15,42 @@ For Scala/Java applications using SBT/Maven project definitions, link your appli For Python applications, you need to add this above library and its dependencies when deploying your application. See the [Deploying](#deploying) subsection below. -### Creating a Kafka Source Stream +## Reading Data from Kafka + +### Creating a Kafka Source for Streaming Queries
{% highlight scala %} // Subscribe to 1 topic -val ds1 = spark +val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1") .load() -ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") +df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] // Subscribe to multiple topics -val ds2 = spark +val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1,topic2") .load() -ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") +df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] // Subscribe to a pattern -val ds3 = spark +val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribePattern", "topic.*") .load() -ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") +df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] {% endhighlight %} @@ -57,31 +59,31 @@ ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") {% highlight java %} // Subscribe to 1 topic -Dataset ds1 = spark +DataFrame df = spark .readStream() .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1") .load() -ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") +df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") // Subscribe to multiple topics -Dataset ds2 = spark +DataFrame df = spark .readStream() .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1,topic2") .load() -ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") +df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") // Subscribe to a pattern -Dataset ds3 = spark +DataFrame df = spark .readStream() .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribePattern", "topic.*") .load() -ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") +df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") {% endhighlight %}
@@ -89,37 +91,37 @@ ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") {% highlight python %} # Subscribe to 1 topic -ds1 = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .option("subscribe", "topic1") +df = spark \ + .readStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .option("subscribe", "topic1") \ .load() -ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") +df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") # Subscribe to multiple topics -ds2 = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .option("subscribe", "topic1,topic2") +df = spark \ + .readStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .option("subscribe", "topic1,topic2") \ .load() -ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") +df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") # Subscribe to a pattern -ds3 = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .option("subscribePattern", "topic.*") +df = spark \ + .readStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .option("subscribePattern", "topic.*") \ .load() -ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") +df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") {% endhighlight %}
-### Creating a Kafka Source Batch +### Creating a Kafka Source for Batch Queries If you have a use case that is better suited to batch processing, you can create an Dataset/DataFrame for a defined range of offsets. @@ -128,17 +130,17 @@ you can create an Dataset/DataFrame for a defined range of offsets. {% highlight scala %} // Subscribe to 1 topic defaults to the earliest and latest offsets -val ds1 = spark +val df = spark .read .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1") .load() -ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") +df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] // Subscribe to multiple topics, specifying explicit Kafka offsets -val ds2 = spark +val df = spark .read .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") @@ -146,11 +148,11 @@ val ds2 = spark .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") .load() -ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") +df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] // Subscribe to a pattern, at the earliest and latest offsets -val ds3 = spark +val df = spark .read .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") @@ -158,7 +160,7 @@ val ds3 = spark .option("startingOffsets", "earliest") .option("endingOffsets", "latest") .load() -ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") +df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] {% endhighlight %} @@ -167,16 +169,16 @@ ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") {% highlight java %} // Subscribe to 1 topic defaults to the earliest and latest offsets -Dataset ds1 = spark +DataFrame df = spark .read() .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1") .load(); -ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); +df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); // Subscribe to multiple topics, specifying explicit Kafka offsets -Dataset ds2 = spark +DataFrame df = spark .read() .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") @@ -184,10 +186,10 @@ Dataset ds2 = spark .option("startingOffsets", "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}") .option("endingOffsets", "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}") .load(); -ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); +df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); // Subscribe to a pattern, at the earliest and latest offsets -Dataset ds3 = spark +DataFrame df = spark .read() .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") @@ -195,7 +197,7 @@ Dataset ds3 = spark .option("startingOffsets", "earliest") .option("endingOffsets", "latest") .load(); -ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); +df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); {% endhighlight %} @@ -203,16 +205,16 @@ ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); {% highlight python %} # Subscribe to 1 topic defaults to the earliest and latest offsets -ds1 = spark \ +df = spark \ .read \ .format("kafka") \ .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ .option("subscribe", "topic1") \ .load() -ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") +df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") # Subscribe to multiple topics, specifying explicit Kafka offsets -ds2 = spark \ +df = spark \ .read \ .format("kafka") \ .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ @@ -220,10 +222,10 @@ ds2 = spark \ .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \ .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \ .load() -ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") +df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") # Subscribe to a pattern, at the earliest and latest offsets -ds3 = spark \ +df = spark \ .read \ .format("kafka") \ .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ @@ -231,8 +233,7 @@ ds3 = spark \ .option("startingOffsets", "earliest") \ .option("endingOffsets", "latest") \ .load() -ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - +df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") {% endhighlight %} @@ -373,11 +374,213 @@ The following configurations are optional: +## Writing Data to Kafka + +Here, we describe the support for writing Streaming Queries and Batch Queries to Apache Kafka. Take note that +Apache Kafka only supports at least once write semantics. Consequently, when writing---either Streaming Queries +or Batch Queries---to Kafka, some records may be duplicated; this can happen, for example, if Kafka needs +to retry a message that was not acknowledged by a Broker, even though that Broker received and wrote the message record. +Structured Streaming cannot prevent such duplicates from occurring due to these Kafka write semantics. However, +if writing the query is successful, then you can assume that the query output was written at least once. A possible +solution to remove duplicates when reading the written data could be to introduce a primary (unique) key +that can be used to perform de-duplication when reading. + +The Dataframe being written to Kafka should have the following columns in schema: + + + + + + + + + + + + + + +
ColumnType
key (optional)string or binary
value (required)string or binary
topic (*optional)string
+\* The topic column is required if the "topic" configuration option is not specified.
+ +The value column is the only required option. If a key column is not specified then +a ```null``` valued key column will be automatically added (see Kafka semantics on +how ```null``` valued key values are handled). If a topic column exists then its value +is used as the topic when writing the given row to Kafka, unless the "topic" configuration +option is set i.e., the "topic" configuration option overrides the topic column. + +The following options must be set for the Kafka sink +for both batch and streaming queries. + + + + + + + + +
Optionvaluemeaning
kafka.bootstrap.serversA comma-separated list of host:portThe Kafka "bootstrap.servers" configuration.
+ +The following configurations are optional: + + + + + + + + + + +
Optionvaluedefaultquery typemeaning
topicstringnonestreaming and batchSets the topic that all rows will be written to in Kafka. This option overrides any + topic column that may exist in the data.
+ +### Creating a Kafka Sink for Streaming Queries + +
+
+{% highlight scala %} + +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option +val ds = df + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic1") + .start() + +// Write key-value data from a DataFrame to Kafka using a topic specified in the data +val ds = df + .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .start() + +{% endhighlight %} +
+
+{% highlight java %} + +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option +StreamingQuery ds = df + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic1") + .start() + +// Write key-value data from a DataFrame to Kafka using a topic specified in the data +StreamingQuery ds = df + .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .start() + +{% endhighlight %} +
+
+{% highlight python %} + +# Write key-value data from a DataFrame to a specific Kafka topic specified in an option +ds = df \ + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \ + .writeStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .option("topic", "topic1") \ + .start() + +# Write key-value data from a DataFrame to Kafka using a topic specified in the data +ds = df \ + .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \ + .writeStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .start() + +{% endhighlight %} +
+
+ +### Writing the output of Batch Queries to Kafka + +
+
+{% highlight scala %} + +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option +df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .write + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic1") + .save() + +// Write key-value data from a DataFrame to Kafka using a topic specified in the data +df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") + .write + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .save() + +{% endhighlight %} +
+
+{% highlight java %} + +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option +df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .write() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic1") + .save() + +// Write key-value data from a DataFrame to Kafka using a topic specified in the data +df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") + .write() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .save() + +{% endhighlight %} +
+
+{% highlight python %} + +# Write key-value data from a DataFrame to a specific Kafka topic specified in an option +df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \ + .write \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .option("topic", "topic1") \ + .save() + +# Write key-value data from a DataFrame to Kafka using a topic specified in the data +df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \ + .write \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .save() + +{% endhighlight %} +
+
+ + +## Kafka Specific Configurations + Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.` prefix, e.g, -`stream.option("kafka.bootstrap.servers", "host:port")`. For possible kafkaParams, see -[Kafka consumer config docs](http://kafka.apache.org/documentation.html#newconsumerconfigs). +`stream.option("kafka.bootstrap.servers", "host:port")`. For possible kafka parameters, see +[Kafka consumer config docs](http://kafka.apache.org/documentation.html#newconsumerconfigs) for +parameters related to reading data, and [Kafka producer config docs](http://kafka.apache.org/documentation/#producerconfigs) +for parameters related to writing data. -Note that the following Kafka params cannot be set and the Kafka source will throw an exception: +Note that the following Kafka params cannot be set and the Kafka source or sink will throw an exception: - **group.id**: Kafka source will create a unique group id for each query automatically. - **auto.offset.reset**: Set the source option `startingOffsets` to specify @@ -389,11 +592,15 @@ Note that the following Kafka params cannot be set and the Kafka source will thr DataFrame operations to explicitly deserialize the keys. - **value.deserializer**: Values are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the values. +- **key.serializer**: Keys are always serialized with ByteArraySerializer or StringSerializer. Use +DataFrame operations to explicitly serialize the keys into either strings or byte arrays. +- **value.serializer**: values are always serialized with ByteArraySerializer or StringSerializer. Use +DataFrame oeprations to explicitly serialize the values into either strings or byte arrays. - **enable.auto.commit**: Kafka source doesn't commit any offset. - **interceptor.classes**: Kafka source always read keys and values as byte arrays. It's not safe to use ConsumerInterceptor as it may break the query. -### Deploying +## Deploying As with any Spark applications, `spark-submit` is used to launch your application. `spark-sql-kafka-0-10_{{site.SCALA_BINARY_VERSION}}` and its dependencies can be directly added to `spark-submit` using `--packages`, such as, -- cgit v1.2.3