diff options
author | Sean Owen <sowen@cloudera.com> | 2017-02-16 12:32:45 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2017-02-16 12:32:45 +0000 |
commit | 0e2405490f2056728d1353abbac6f3ea177ae533 (patch) | |
tree | 1a9ec960faec7abcb8d8fbac43b6a6dc633d2297 /docs/streaming-kafka-0-10-integration.md | |
parent | 3871d94a695d47169720e877f77ff1e4bede43ee (diff) | |
download | spark-0e2405490f2056728d1353abbac6f3ea177ae533.tar.gz spark-0e2405490f2056728d1353abbac6f3ea177ae533.tar.bz2 spark-0e2405490f2056728d1353abbac6f3ea177ae533.zip |
[SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support
- Move external/java8-tests tests into core, streaming, sql and remove
- Remove MaxPermGen and related options
- Fix some reflection / TODOs around Java 8+ methods
- Update doc references to 1.7/1.8 differences
- Remove Java 7/8 related build profiles
- Update some plugins for better Java 8 compatibility
- Fix a few Java-related warnings
For the future:
- Update Java 8 examples to fully use Java 8
- Update Java tests to use lambdas for simplicity
- Update Java internal implementations to use lambdas
## How was this patch tested?
Existing tests
Author: Sean Owen <sowen@cloudera.com>
Closes #16871 from srowen/SPARK-19493.
Diffstat (limited to 'docs/streaming-kafka-0-10-integration.md')
-rw-r--r-- | docs/streaming-kafka-0-10-integration.md | 62 |
1 files changed, 22 insertions, 40 deletions
diff --git a/docs/streaming-kafka-0-10-integration.md b/docs/streaming-kafka-0-10-integration.md index 6ef54ac210..e383701316 100644 --- a/docs/streaming-kafka-0-10-integration.md +++ b/docs/streaming-kafka-0-10-integration.md @@ -68,20 +68,14 @@ kafkaParams.put("enable.auto.commit", false); Collection<String> topics = Arrays.asList("topicA", "topicB"); -final JavaInputDStream<ConsumerRecord<String, String>> stream = +JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream( streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams) ); -stream.mapToPair( - new PairFunction<ConsumerRecord<String, String>, String, String>() { - @Override - public Tuple2<String, String> call(ConsumerRecord<String, String> record) { - return new Tuple2<>(record.key(), record.value()); - } - }) +stream.mapToPair(record -> new Tuple2<>(record.key(), record.value())); {% endhighlight %} </div> </div> @@ -162,19 +156,13 @@ stream.foreachRDD { rdd => </div> <div data-lang="java" markdown="1"> {% highlight java %} -stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() { - @Override - public void call(JavaRDD<ConsumerRecord<String, String>> rdd) { - final OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); - rdd.foreachPartition(new VoidFunction<Iterator<ConsumerRecord<String, String>>>() { - @Override - public void call(Iterator<ConsumerRecord<String, String>> consumerRecords) { - OffsetRange o = offsetRanges[TaskContext.get().partitionId()]; - System.out.println( - o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()); - } - }); - } +stream.foreachRDD(rdd -> { + OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); + rdd.foreachPartition(consumerRecords -> { + OffsetRange o = offsetRanges[TaskContext.get().partitionId()]; + System.out.println( + o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()); + }); }); {% endhighlight %} </div> @@ -205,14 +193,11 @@ As with HasOffsetRanges, the cast to CanCommitOffsets will only succeed if calle </div> <div data-lang="java" markdown="1"> {% highlight java %} -stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() { - @Override - public void call(JavaRDD<ConsumerRecord<String, String>> rdd) { - OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); +stream.foreachRDD(rdd -> { + OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); - // some time later, after outputs have completed - ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges); - } + // some time later, after outputs have completed + ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges); }); {% endhighlight %} </div> @@ -268,21 +253,18 @@ JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirec ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets) ); -stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() { - @Override - public void call(JavaRDD<ConsumerRecord<String, String>> rdd) { - OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); - - Object results = yourCalculation(rdd); +stream.foreachRDD(rdd -> { + OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); + + Object results = yourCalculation(rdd); - // begin your transaction + // begin your transaction - // update results - // update offsets where the end of existing offsets matches the beginning of this batch of offsets - // assert that offsets were updated correctly + // update results + // update offsets where the end of existing offsets matches the beginning of this batch of offsets + // assert that offsets were updated correctly - // end your transaction - } + // end your transaction }); {% endhighlight %} </div> |