aboutsummaryrefslogtreecommitdiff
path: root/docs/streaming-kafka-integration.md
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-06-12 15:22:59 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-06-12 15:22:59 -0700
commite9471d3414d327c7d0853e18f1844ab1bd09c8ed (patch)
treec71dabecf459ff18b1c974c40f6b8f2f8924a9ce /docs/streaming-kafka-integration.md
parent88604051511c788d7abb41a49e3eb3a8330c09a9 (diff)
downloadspark-e9471d3414d327c7d0853e18f1844ab1bd09c8ed.tar.gz
spark-e9471d3414d327c7d0853e18f1844ab1bd09c8ed.tar.bz2
spark-e9471d3414d327c7d0853e18f1844ab1bd09c8ed.zip
[SPARK-7284] [STREAMING] Updated streaming documentation
- Kinesis API updated - Kafka version updated, and Python API for Direct Kafka added - Added SQLContext.getOrCreate() - Added information on how to get partitionId in foreachRDD Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #6781 from tdas/SPARK-7284 and squashes the following commits: aac7be0 [Tathagata Das] Added information on how to get partition id a66ec22 [Tathagata Das] Complete the line incomplete line, a92ca39 [Tathagata Das] Updated streaming documentation
Diffstat (limited to 'docs/streaming-kafka-integration.md')
-rw-r--r--docs/streaming-kafka-integration.md12
1 files changed, 11 insertions, 1 deletions
diff --git a/docs/streaming-kafka-integration.md b/docs/streaming-kafka-integration.md
index 998c8c994e..02bc95d0e9 100644
--- a/docs/streaming-kafka-integration.md
+++ b/docs/streaming-kafka-integration.md
@@ -119,6 +119,13 @@ Next, we discuss how to use this approach in your streaming application.
and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java).
</div>
+ <div data-lang="python" markdown="1">
+ from pyspark.streaming.kafka import KafkaUtils
+ directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
+
+ By default, the Python API will decode Kafka data as UTF8 encoded strings. You can specify your custom decoding function to decode the byte arrays in Kafka records to any arbitrary data type. See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils)
+ and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/direct_kafka_wordcount.py).
+ </div>
</div>
In the Kafka parameters, you must specify either `metadata.broker.list` or `bootstrap.servers`.
@@ -147,10 +154,13 @@ Next, we discuss how to use this approach in your streaming application.
}
);
</div>
+ <div data-lang="python" markdown="1">
+ Not supported
</div>
+ </div>
You can use this to update Zookeeper yourself if you want Zookeeper-based Kafka monitoring tools to show progress of the streaming application.
Another thing to note is that since this approach does not use Receivers, the standard receiver-related (that is, [configurations](configuration.html) of the form `spark.streaming.receiver.*` ) will not apply to the input DStreams created by this approach (will apply to other input DStreams though). Instead, use the [configurations](configuration.html) `spark.streaming.kafka.*`. An important one is `spark.streaming.kafka.maxRatePerPartition` which is the maximum rate at which each Kafka partition will be read by this direct API.
-3. **Deploying:** Similar to the first approach, you can package `spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` and its dependencies into the application JAR and the launch the application using `spark-submit`. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` dependencies as those are already present in a Spark installation.
+3. **Deploying:** This is same as the first approach, for Scala, Java and Python.