aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/streaming-kafka-integration.md15
1 files changed, 14 insertions, 1 deletions
diff --git a/docs/streaming-kafka-integration.md b/docs/streaming-kafka-integration.md
index ab7f0117c0..b00351b2fb 100644
--- a/docs/streaming-kafka-integration.md
+++ b/docs/streaming-kafka-integration.md
@@ -181,7 +181,20 @@ Next, we discuss how to use this approach in your streaming application.
);
</div>
<div data-lang="python" markdown="1">
- Not supported yet
+ offsetRanges = []
+
+ def storeOffsetRanges(rdd):
+ global offsetRanges
+ offsetRanges = rdd.offsetRanges()
+ return rdd
+
+ def printOffsetRanges(rdd):
+ for o in offsetRanges:
+ print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset)
+
+ directKafkaStream\
+ .transform(storeOffsetRanges)\
+ .foreachRDD(printOffsetRanges)
</div>
</div>