aboutsummaryrefslogtreecommitdiff
path: root/docs/streaming-kafka-0-8-integration.md
diff options
context:
space:
mode:
Diffstat (limited to 'docs/streaming-kafka-0-8-integration.md')
-rw-r--r--docs/streaming-kafka-0-8-integration.md41
1 files changed, 15 insertions, 26 deletions
diff --git a/docs/streaming-kafka-0-8-integration.md b/docs/streaming-kafka-0-8-integration.md
index 58b17aa4ce..24a3e4cdbb 100644
--- a/docs/streaming-kafka-0-8-integration.md
+++ b/docs/streaming-kafka-0-8-integration.md
@@ -155,33 +155,22 @@ Next, we discuss how to use this approach in your streaming application.
</div>
<div data-lang="java" markdown="1">
// Hold a reference to the current offset ranges, so it can be used downstream
- final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();
-
- directKafkaStream.transformToPair(
- new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
- @Override
- public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {
- OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
- offsetRanges.set(offsets);
- return rdd;
- }
- }
- ).map(
+ AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();
+
+ directKafkaStream.transformToPair(rdd -> {
+ OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
+ offsetRanges.set(offsets);
+ return rdd;
+ }).map(
...
- ).foreachRDD(
- new Function<JavaPairRDD<String, String>, Void>() {
- @Override
- public Void call(JavaPairRDD<String, String> rdd) throws IOException {
- for (OffsetRange o : offsetRanges.get()) {
- System.out.println(
- o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
- );
- }
- ...
- return null;
- }
- }
- );
+ ).foreachRDD(rdd -> {
+ for (OffsetRange o : offsetRanges.get()) {
+ System.out.println(
+ o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
+ );
+ }
+ ...
+ });
</div>
<div data-lang="python" markdown="1">
offsetRanges = []