aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorcody koeninger <cody@koeninger.org>2015-04-12 17:37:30 +0100
committerSean Owen <sowen@cloudera.com>2015-04-12 17:37:30 +0100
commit6ac8eea2fc6b782015236e4e7106e59d0d9e1b38 (patch)
treed4d62a676b921720584f51319f15363ec3f190b3 /external
parentddc17431a4108ab6efe0cd329d69e1f2fca5ac12 (diff)
downloadspark-6ac8eea2fc6b782015236e4e7106e59d0d9e1b38.tar.gz
spark-6ac8eea2fc6b782015236e4e7106e59d0d9e1b38.tar.bz2
spark-6ac8eea2fc6b782015236e4e7106e59d0d9e1b38.zip
[SPARK-6431][Streaming][Kafka] Error message for partition metadata requ...
...ests The original reported problem was misdiagnosed; the topic just didn't exist yet. Agreed upon solution was to improve error handling / message Author: cody koeninger <cody@koeninger.org> Closes #5454 from koeninger/spark-6431-master and squashes the following commits: 44300f8 [cody koeninger] [SPARK-6431][Streaming][Kafka] Error message for partition metadata requests
Diffstat (limited to 'external')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala14
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala3
2 files changed, 14 insertions, 3 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
index 2f7e0ab39f..bd767031c1 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
@@ -123,9 +123,17 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
val errs = new Err
withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
val resp: TopicMetadataResponse = consumer.send(req)
- // error codes here indicate missing / just created topic,
- // repeating on a different broker wont be useful
- return Right(resp.topicsMetadata.toSet)
+ val respErrs = resp.topicsMetadata.filter(m => m.errorCode != ErrorMapping.NoError)
+
+ if (respErrs.isEmpty) {
+ return Right(resp.topicsMetadata.toSet)
+ } else {
+ respErrs.foreach { m =>
+ val cause = ErrorMapping.exceptionFor(m.errorCode)
+ val msg = s"Error getting partition metadata for '${m.topic}'. Does the topic exist?"
+ errs.append(new SparkException(msg, cause))
+ }
+ }
}
Left(errs)
}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
index 2b33d2a220..7fb841b79c 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
@@ -52,6 +52,9 @@ class KafkaClusterSuite extends FunSuite with BeforeAndAfterAll {
val parts = kc.getPartitions(Set(topic)).right.get
assert(parts(topicAndPartition), "didn't get partitions")
+
+ val err = kc.getPartitions(Set(topic + "BAD"))
+ assert(err.isLeft, "getPartitions for a nonexistant topic should be an error")
}
test("leader offset apis") {