aboutsummaryrefslogtreecommitdiff
path: root/external/kafka/src/test
diff options
context:
space:
mode:
authorcody koeninger <cody@koeninger.org>2015-06-19 18:54:07 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-06-19 18:54:07 -0700
commit1b6fe9b1a70aa3f81448c2705ea3a4b501cbda9d (patch)
treea287b75a28408bd0d831a9257873fa0995e6c670 /external/kafka/src/test
parentbec40e52be1bbe8fd6f3a1daa6284429d6b5d841 (diff)
downloadspark-1b6fe9b1a70aa3f81448c2705ea3a4b501cbda9d.tar.gz
spark-1b6fe9b1a70aa3f81448c2705ea3a4b501cbda9d.tar.bz2
spark-1b6fe9b1a70aa3f81448c2705ea3a4b501cbda9d.zip
[SPARK-8127] [STREAMING] [KAFKA] KafkaRDD optimize count() take() isEmpty()
…ed KafkaRDD methods. Possible fix for [SPARK-7122], but probably a worthwhile optimization regardless. Author: cody koeninger <cody@koeninger.org> Closes #6632 from koeninger/kafka-rdd-count and squashes the following commits: 321340d [cody koeninger] [SPARK-8127][Streaming][Kafka] additional test of ordering of take() 5a05d0f [cody koeninger] [SPARK-8127][Streaming][Kafka] additional test of isEmpty f68bd32 [cody koeninger] [Streaming][Kafka][SPARK-8127] code cleanup 9555b73 [cody koeninger] Merge branch 'master' into kafka-rdd-count 253031d [cody koeninger] [Streaming][Kafka][SPARK-8127] mima exclusion for change to private method 8974b9e [cody koeninger] [Streaming][Kafka][SPARK-8127] check offset ranges before constructing KafkaRDD c3768c5 [cody koeninger] [Streaming][Kafka] Take advantage of offset range info for size-related KafkaRDD methods. Possible fix for [SPARK-7122], but probably a worthwhile optimization regardless.
Diffstat (limited to 'external/kafka/src/test')
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala26
1 files changed, 23 insertions, 3 deletions
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
index d5baf5fd89..f52a738afd 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
@@ -55,8 +55,8 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
test("basic usage") {
val topic = s"topicbasic-${Random.nextInt}"
kafkaTestUtils.createTopic(topic)
- val messages = Set("the", "quick", "brown", "fox")
- kafkaTestUtils.sendMessages(topic, messages.toArray)
+ val messages = Array("the", "quick", "brown", "fox")
+ kafkaTestUtils.sendMessages(topic, messages)
val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress,
"group.id" -> s"test-consumer-${Random.nextInt}")
@@ -67,7 +67,27 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
sc, kafkaParams, offsetRanges)
val received = rdd.map(_._2).collect.toSet
- assert(received === messages)
+ assert(received === messages.toSet)
+
+ // size-related method optimizations return sane results
+ assert(rdd.count === messages.size)
+ assert(rdd.countApprox(0).getFinalValue.mean === messages.size)
+ assert(!rdd.isEmpty)
+ assert(rdd.take(1).size === 1)
+ assert(rdd.take(1).head._2 === messages.head)
+ assert(rdd.take(messages.size + 10).size === messages.size)
+
+ val emptyRdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
+ sc, kafkaParams, Array(OffsetRange(topic, 0, 0, 0)))
+
+ assert(emptyRdd.isEmpty)
+
+ // invalid offset ranges throw exceptions
+ val badRanges = Array(OffsetRange(topic, 0, 0, messages.size + 1))
+ intercept[SparkException] {
+ KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
+ sc, kafkaParams, badRanges)
+ }
}
test("iterator boundary conditions") {