diff options
author | cody koeninger <cody@koeninger.org> | 2015-06-19 17:16:56 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-06-19 17:18:31 -0700 |
commit | b305e377fb0a2ca67d9924b995c51e483a4944ad (patch) | |
tree | 6cc1be972780d60a7df154dacecdf1009f0fcc21 /external | |
parent | a333a72e029d2546a66b36d6b3458e965430c530 (diff) | |
download | spark-b305e377fb0a2ca67d9924b995c51e483a4944ad.tar.gz spark-b305e377fb0a2ca67d9924b995c51e483a4944ad.tar.bz2 spark-b305e377fb0a2ca67d9924b995c51e483a4944ad.zip |
[SPARK-8390] [STREAMING] [KAFKA] fix docs related to HasOffsetRanges
Author: cody koeninger <cody@koeninger.org>
Closes #6863 from koeninger/SPARK-8390 and squashes the following commits:
26a06bd [cody koeninger] Merge branch 'master' into SPARK-8390
3744492 [cody koeninger] [Streaming][Kafka][SPARK-8390] doc changes per TD, test to make sure approach shown in docs actually compiles + runs
b108c9d [cody koeninger] [Streaming][Kafka][SPARK-8390] further doc fixes, clean up spacing
bb4336b [cody koeninger] [Streaming][Kafka][SPARK-8390] fix docs related to HasOffsetRanges, cleanup
3f3c57a [cody koeninger] [Streaming][Kafka][SPARK-8389] Example of getting offset ranges out of the existing java direct stream api
Diffstat (limited to 'external')
2 files changed, 22 insertions, 5 deletions
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java index 3913b711ba..02cd24a359 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java @@ -19,6 +19,7 @@ package org.apache.spark.streaming.kafka; import java.io.Serializable; import java.util.*; +import java.util.concurrent.atomic.AtomicReference; import scala.Tuple2; @@ -68,6 +69,8 @@ public class JavaDirectKafkaStreamSuite implements Serializable { public void testKafkaStream() throws InterruptedException { final String topic1 = "topic1"; final String topic2 = "topic2"; + // hold a reference to the current offset ranges, so it can be used downstream + final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference(); String[] topic1data = createTopicAndSendData(topic1); String[] topic2data = createTopicAndSendData(topic2); @@ -93,7 +96,8 @@ public class JavaDirectKafkaStreamSuite implements Serializable { new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() { @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception { - OffsetRange[] offsets = ((HasOffsetRanges)rdd.rdd()).offsetRanges(); + OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); + offsetRanges.set(offsets); Assert.assertEquals(offsets[0].topic(), topic1); return rdd; } @@ -131,6 +135,11 @@ public class JavaDirectKafkaStreamSuite implements Serializable { @Override public Void call(JavaRDD<String> rdd) throws Exception { result.addAll(rdd.collect()); + for (OffsetRange o : offsetRanges.get()) { + System.out.println( + o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset() + ); + } return null; } } diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 212eb35c61..8e1715f6db 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -102,13 +102,21 @@ class DirectKafkaStreamSuite val allReceived = new ArrayBuffer[(String, String)] with mutable.SynchronizedBuffer[(String, String)] - stream.foreachRDD { rdd => - // Get the offset ranges in the RDD - val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges + // hold a reference to the current offset ranges, so it can be used downstream + var offsetRanges = Array[OffsetRange]() + + stream.transform { rdd => + // Get the offset ranges in the RDD + offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges + rdd + }.foreachRDD { rdd => + for (o <- offsetRanges) { + println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") + } val collected = rdd.mapPartitionsWithIndex { (i, iter) => // For each partition, get size of the range in the partition, // and the number of items in the partition - val off = offsets(i) + val off = offsetRanges(i) val all = iter.toSeq val partSize = all.size val rangeSize = off.untilOffset - off.fromOffset |