aboutsummaryrefslogtreecommitdiff
path: root/docs/streaming-flume-integration.md
diff options
context:
space:
mode:
Diffstat (limited to 'docs/streaming-flume-integration.md')
-rw-r--r--docs/streaming-flume-integration.md18
1 files changed, 18 insertions, 0 deletions
diff --git a/docs/streaming-flume-integration.md b/docs/streaming-flume-integration.md
index 8d6e743709..de0461010d 100644
--- a/docs/streaming-flume-integration.md
+++ b/docs/streaming-flume-integration.md
@@ -58,6 +58,15 @@ configuring Flume agents.
See the [API docs](api/java/index.html?org/apache/spark/streaming/flume/FlumeUtils.html)
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java).
</div>
+ <div data-lang="python" markdown="1">
+ from pyspark.streaming.flume import FlumeUtils
+
+ flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])
+
+ By default, the Python API will decode Flume event body as UTF8 encoded strings. You can specify your custom decoding function to decode the body byte arrays in Flume events to any arbitrary data type.
+ See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils)
+ and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/flume_wordcount.py).
+ </div>
</div>
Note that the hostname should be the same as the one used by the resource manager in the
@@ -135,6 +144,15 @@ configuring Flume agents.
JavaReceiverInputDStream<SparkFlumeEvent>flumeStream =
FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port]);
</div>
+ <div data-lang="python" markdown="1">
+ from pyspark.streaming.flume import FlumeUtils
+
+ addresses = [([sink machine hostname 1], [sink port 1]), ([sink machine hostname 2], [sink port 2])]
+ flumeStream = FlumeUtils.createPollingStream(streamingContext, addresses)
+
+ By default, the Python API will decode Flume event body as UTF8 encoded strings. You can specify your custom decoding function to decode the body byte arrays in Flume events to any arbitrary data type.
+ See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils).
+ </div>
</div>
See the Scala example [FlumePollingEventCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala).