diff options
Diffstat (limited to 'docs/streaming-kafka-0-8-integration.md')
-rw-r--r-- | docs/streaming-kafka-0-8-integration.md | 41 |
1 files changed, 15 insertions, 26 deletions
diff --git a/docs/streaming-kafka-0-8-integration.md b/docs/streaming-kafka-0-8-integration.md index 58b17aa4ce..24a3e4cdbb 100644 --- a/docs/streaming-kafka-0-8-integration.md +++ b/docs/streaming-kafka-0-8-integration.md @@ -155,33 +155,22 @@ Next, we discuss how to use this approach in your streaming application. </div> <div data-lang="java" markdown="1"> // Hold a reference to the current offset ranges, so it can be used downstream - final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>(); - - directKafkaStream.transformToPair( - new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() { - @Override - public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception { - OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); - offsetRanges.set(offsets); - return rdd; - } - } - ).map( + AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>(); + + directKafkaStream.transformToPair(rdd -> { + OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); + offsetRanges.set(offsets); + return rdd; + }).map( ... - ).foreachRDD( - new Function<JavaPairRDD<String, String>, Void>() { - @Override - public Void call(JavaPairRDD<String, String> rdd) throws IOException { - for (OffsetRange o : offsetRanges.get()) { - System.out.println( - o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset() - ); - } - ... - return null; - } - } - ); + ).foreachRDD(rdd -> { + for (OffsetRange o : offsetRanges.get()) { + System.out.println( + o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset() + ); + } + ... + }); </div> <div data-lang="python" markdown="1"> offsetRanges = [] |