diff options
author | cody koeninger <cody@koeninger.org> | 2015-04-12 17:37:30 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2015-04-12 17:37:30 +0100 |
commit | 6ac8eea2fc6b782015236e4e7106e59d0d9e1b38 (patch) | |
tree | d4d62a676b921720584f51319f15363ec3f190b3 /external | |
parent | ddc17431a4108ab6efe0cd329d69e1f2fca5ac12 (diff) | |
download | spark-6ac8eea2fc6b782015236e4e7106e59d0d9e1b38.tar.gz spark-6ac8eea2fc6b782015236e4e7106e59d0d9e1b38.tar.bz2 spark-6ac8eea2fc6b782015236e4e7106e59d0d9e1b38.zip |
[SPARK-6431][Streaming][Kafka] Error message for partition metadata requ...
...ests
The original reported problem was misdiagnosed; the topic just didn't exist yet. Agreed upon solution was to improve error handling / message
Author: cody koeninger <cody@koeninger.org>
Closes #5454 from koeninger/spark-6431-master and squashes the following commits:
44300f8 [cody koeninger] [SPARK-6431][Streaming][Kafka] Error message for partition metadata requests
Diffstat (limited to 'external')
-rw-r--r-- | external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala | 14 | ||||
-rw-r--r-- | external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala | 3 |
2 files changed, 14 insertions, 3 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala index 2f7e0ab39f..bd767031c1 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala @@ -123,9 +123,17 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { val errs = new Err withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer => val resp: TopicMetadataResponse = consumer.send(req) - // error codes here indicate missing / just created topic, - // repeating on a different broker wont be useful - return Right(resp.topicsMetadata.toSet) + val respErrs = resp.topicsMetadata.filter(m => m.errorCode != ErrorMapping.NoError) + + if (respErrs.isEmpty) { + return Right(resp.topicsMetadata.toSet) + } else { + respErrs.foreach { m => + val cause = ErrorMapping.exceptionFor(m.errorCode) + val msg = s"Error getting partition metadata for '${m.topic}'. Does the topic exist?" + errs.append(new SparkException(msg, cause)) + } + } } Left(errs) } diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala index 2b33d2a220..7fb841b79c 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala @@ -52,6 +52,9 @@ class KafkaClusterSuite extends FunSuite with BeforeAndAfterAll { val parts = kc.getPartitions(Set(topic)).right.get assert(parts(topicAndPartition), "didn't get partitions") + + val err = kc.getPartitions(Set(topic + "BAD")) + assert(err.isLeft, "getPartitions for a nonexistant topic should be an error") } test("leader offset apis") { |