aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorcody koeninger <cody@koeninger.org>2015-06-19 17:16:56 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-06-19 17:18:31 -0700
commitb305e377fb0a2ca67d9924b995c51e483a4944ad (patch)
tree6cc1be972780d60a7df154dacecdf1009f0fcc21 /external
parenta333a72e029d2546a66b36d6b3458e965430c530 (diff)
downloadspark-b305e377fb0a2ca67d9924b995c51e483a4944ad.tar.gz
spark-b305e377fb0a2ca67d9924b995c51e483a4944ad.tar.bz2
spark-b305e377fb0a2ca67d9924b995c51e483a4944ad.zip
[SPARK-8390] [STREAMING] [KAFKA] fix docs related to HasOffsetRanges
Author: cody koeninger <cody@koeninger.org> Closes #6863 from koeninger/SPARK-8390 and squashes the following commits: 26a06bd [cody koeninger] Merge branch 'master' into SPARK-8390 3744492 [cody koeninger] [Streaming][Kafka][SPARK-8390] doc changes per TD, test to make sure approach shown in docs actually compiles + runs b108c9d [cody koeninger] [Streaming][Kafka][SPARK-8390] further doc fixes, clean up spacing bb4336b [cody koeninger] [Streaming][Kafka][SPARK-8390] fix docs related to HasOffsetRanges, cleanup 3f3c57a [cody koeninger] [Streaming][Kafka][SPARK-8389] Example of getting offset ranges out of the existing java direct stream api
Diffstat (limited to 'external')
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java11
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala16
2 files changed, 22 insertions, 5 deletions
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
index 3913b711ba..02cd24a359 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
@@ -19,6 +19,7 @@ package org.apache.spark.streaming.kafka;
import java.io.Serializable;
import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
import scala.Tuple2;
@@ -68,6 +69,8 @@ public class JavaDirectKafkaStreamSuite implements Serializable {
public void testKafkaStream() throws InterruptedException {
final String topic1 = "topic1";
final String topic2 = "topic2";
+ // hold a reference to the current offset ranges, so it can be used downstream
+ final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference();
String[] topic1data = createTopicAndSendData(topic1);
String[] topic2data = createTopicAndSendData(topic2);
@@ -93,7 +96,8 @@ public class JavaDirectKafkaStreamSuite implements Serializable {
new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
@Override
public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {
- OffsetRange[] offsets = ((HasOffsetRanges)rdd.rdd()).offsetRanges();
+ OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
+ offsetRanges.set(offsets);
Assert.assertEquals(offsets[0].topic(), topic1);
return rdd;
}
@@ -131,6 +135,11 @@ public class JavaDirectKafkaStreamSuite implements Serializable {
@Override
public Void call(JavaRDD<String> rdd) throws Exception {
result.addAll(rdd.collect());
+ for (OffsetRange o : offsetRanges.get()) {
+ System.out.println(
+ o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
+ );
+ }
return null;
}
}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index 212eb35c61..8e1715f6db 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -102,13 +102,21 @@ class DirectKafkaStreamSuite
val allReceived =
new ArrayBuffer[(String, String)] with mutable.SynchronizedBuffer[(String, String)]
- stream.foreachRDD { rdd =>
- // Get the offset ranges in the RDD
- val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ // hold a reference to the current offset ranges, so it can be used downstream
+ var offsetRanges = Array[OffsetRange]()
+
+ stream.transform { rdd =>
+ // Get the offset ranges in the RDD
+ offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ rdd
+ }.foreachRDD { rdd =>
+ for (o <- offsetRanges) {
+ println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
+ }
val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
// For each partition, get size of the range in the partition,
// and the number of items in the partition
- val off = offsets(i)
+ val off = offsetRanges(i)
val all = iter.toSeq
val partSize = all.size
val rangeSize = off.untilOffset - off.fromOffset