aboutsummaryrefslogtreecommitdiff
path: root/docs/streaming-kafka-0-10-integration.md
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2017-02-16 12:32:45 +0000
committerSean Owen <sowen@cloudera.com>2017-02-16 12:32:45 +0000
commit0e2405490f2056728d1353abbac6f3ea177ae533 (patch)
tree1a9ec960faec7abcb8d8fbac43b6a6dc633d2297 /docs/streaming-kafka-0-10-integration.md
parent3871d94a695d47169720e877f77ff1e4bede43ee (diff)
downloadspark-0e2405490f2056728d1353abbac6f3ea177ae533.tar.gz
spark-0e2405490f2056728d1353abbac6f3ea177ae533.tar.bz2
spark-0e2405490f2056728d1353abbac6f3ea177ae533.zip
[SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support
- Move external/java8-tests tests into core, streaming, sql and remove - Remove MaxPermGen and related options - Fix some reflection / TODOs around Java 8+ methods - Update doc references to 1.7/1.8 differences - Remove Java 7/8 related build profiles - Update some plugins for better Java 8 compatibility - Fix a few Java-related warnings For the future: - Update Java 8 examples to fully use Java 8 - Update Java tests to use lambdas for simplicity - Update Java internal implementations to use lambdas ## How was this patch tested? Existing tests Author: Sean Owen <sowen@cloudera.com> Closes #16871 from srowen/SPARK-19493.
Diffstat (limited to 'docs/streaming-kafka-0-10-integration.md')
-rw-r--r--docs/streaming-kafka-0-10-integration.md62
1 files changed, 22 insertions, 40 deletions
diff --git a/docs/streaming-kafka-0-10-integration.md b/docs/streaming-kafka-0-10-integration.md
index 6ef54ac210..e383701316 100644
--- a/docs/streaming-kafka-0-10-integration.md
+++ b/docs/streaming-kafka-0-10-integration.md
@@ -68,20 +68,14 @@ kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("topicA", "topicB");
-final JavaInputDStream<ConsumerRecord<String, String>> stream =
+JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
-stream.mapToPair(
- new PairFunction<ConsumerRecord<String, String>, String, String>() {
- @Override
- public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
- return new Tuple2<>(record.key(), record.value());
- }
- })
+stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
{% endhighlight %}
</div>
</div>
@@ -162,19 +156,13 @@ stream.foreachRDD { rdd =>
</div>
<div data-lang="java" markdown="1">
{% highlight java %}
-stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
- @Override
- public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
- final OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
- rdd.foreachPartition(new VoidFunction<Iterator<ConsumerRecord<String, String>>>() {
- @Override
- public void call(Iterator<ConsumerRecord<String, String>> consumerRecords) {
- OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
- System.out.println(
- o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
- }
- });
- }
+stream.foreachRDD(rdd -> {
+ OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
+ rdd.foreachPartition(consumerRecords -> {
+ OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
+ System.out.println(
+ o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
+ });
});
{% endhighlight %}
</div>
@@ -205,14 +193,11 @@ As with HasOffsetRanges, the cast to CanCommitOffsets will only succeed if calle
</div>
<div data-lang="java" markdown="1">
{% highlight java %}
-stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
- @Override
- public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
- OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
+stream.foreachRDD(rdd -> {
+ OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
- // some time later, after outputs have completed
- ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
- }
+ // some time later, after outputs have completed
+ ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
});
{% endhighlight %}
</div>
@@ -268,21 +253,18 @@ JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirec
ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets)
);
-stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
- @Override
- public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
- OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
-
- Object results = yourCalculation(rdd);
+stream.foreachRDD(rdd -> {
+ OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
+
+ Object results = yourCalculation(rdd);
- // begin your transaction
+ // begin your transaction
- // update results
- // update offsets where the end of existing offsets matches the beginning of this batch of offsets
- // assert that offsets were updated correctly
+ // update results
+ // update offsets where the end of existing offsets matches the beginning of this batch of offsets
+ // assert that offsets were updated correctly
- // end your transaction
- }
+ // end your transaction
});
{% endhighlight %}
</div>