aboutsummaryrefslogtreecommitdiff
path: root/docs
diff options
context:
space:
mode:
Diffstat (limited to 'docs')
-rw-r--r--docs/streaming-flume-integration.md18
-rw-r--r--docs/streaming-programming-guide.md2
2 files changed, 19 insertions, 1 deletions
diff --git a/docs/streaming-flume-integration.md b/docs/streaming-flume-integration.md
index 8d6e743709..de0461010d 100644
--- a/docs/streaming-flume-integration.md
+++ b/docs/streaming-flume-integration.md
@@ -58,6 +58,15 @@ configuring Flume agents.
See the [API docs](api/java/index.html?org/apache/spark/streaming/flume/FlumeUtils.html)
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java).
</div>
+ <div data-lang="python" markdown="1">
+ from pyspark.streaming.flume import FlumeUtils
+
+ flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])
+
+ By default, the Python API will decode Flume event body as UTF8 encoded strings. You can specify your custom decoding function to decode the body byte arrays in Flume events to any arbitrary data type.
+ See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils)
+ and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/flume_wordcount.py).
+ </div>
</div>
Note that the hostname should be the same as the one used by the resource manager in the
@@ -135,6 +144,15 @@ configuring Flume agents.
JavaReceiverInputDStream<SparkFlumeEvent>flumeStream =
FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port]);
</div>
+ <div data-lang="python" markdown="1">
+ from pyspark.streaming.flume import FlumeUtils
+
+ addresses = [([sink machine hostname 1], [sink port 1]), ([sink machine hostname 2], [sink port 2])]
+ flumeStream = FlumeUtils.createPollingStream(streamingContext, addresses)
+
+ By default, the Python API will decode Flume event body as UTF8 encoded strings. You can specify your custom decoding function to decode the body byte arrays in Flume events to any arbitrary data type.
+ See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils).
+ </div>
</div>
See the Scala example [FlumePollingEventCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala).
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index b784d59666..e72d5580da 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -683,7 +683,7 @@ for Java, and [StreamingContext](api/python/pyspark.streaming.html#pyspark.strea
{:.no_toc}
<span class="badge" style="background-color: grey">Python API</span> As of Spark {{site.SPARK_VERSION_SHORT}},
-out of these sources, *only* Kafka is available in the Python API. We will add more advanced sources in the Python API in future.
+out of these sources, *only* Kafka and Flume are available in the Python API. We will add more advanced sources in the Python API in future.
This category of sources require interfacing with external non-Spark libraries, some of them with
complex dependencies (e.g., Kafka and Flume). Hence, to minimize issues related to version conflicts