aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorcody koeninger <cody@koeninger.org>2015-06-19 14:51:19 +0200
committerSean Owen <sowen@cloudera.com>2015-06-19 14:51:19 +0200
commit47af7c1ebfdbd7637f626ab07bf2bda6534f37ea (patch)
tree50f753c7498369e961a4bd28ebda2a1a3eede7b4 /external
parentebd363aecde977511469d47fb1ea7cb5df3c3541 (diff)
downloadspark-47af7c1ebfdbd7637f626ab07bf2bda6534f37ea.tar.gz
spark-47af7c1ebfdbd7637f626ab07bf2bda6534f37ea.tar.bz2
spark-47af7c1ebfdbd7637f626ab07bf2bda6534f37ea.zip
[SPARK-8389] [STREAMING] [KAFKA] Example of getting offset ranges out o…
…f the existing java direct stream api Author: cody koeninger <cody@koeninger.org> Closes #6846 from koeninger/SPARK-8389 and squashes the following commits: 3f3c57a [cody koeninger] [Streaming][Kafka][SPARK-8389] Example of getting offset ranges out of the existing java direct stream api
Diffstat (limited to 'external')
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java15
1 files changed, 13 insertions, 2 deletions
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
index c0669fb336..3913b711ba 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
@@ -32,6 +32,7 @@ import org.junit.Test;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
@@ -65,8 +66,8 @@ public class JavaDirectKafkaStreamSuite implements Serializable {
@Test
public void testKafkaStream() throws InterruptedException {
- String topic1 = "topic1";
- String topic2 = "topic2";
+ final String topic1 = "topic1";
+ final String topic2 = "topic2";
String[] topic1data = createTopicAndSendData(topic1);
String[] topic2data = createTopicAndSendData(topic2);
@@ -87,6 +88,16 @@ public class JavaDirectKafkaStreamSuite implements Serializable {
StringDecoder.class,
kafkaParams,
topicToSet(topic1)
+ ).transformToPair(
+ // Make sure you can get offset ranges from the rdd
+ new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
+ @Override
+ public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {
+ OffsetRange[] offsets = ((HasOffsetRanges)rdd.rdd()).offsetRanges();
+ Assert.assertEquals(offsets[0].topic(), topic1);
+ return rdd;
+ }
+ }
).map(
new Function<Tuple2<String, String>, String>() {
@Override