aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala14
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala3
2 files changed, 14 insertions, 3 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
index 2f7e0ab39f..bd767031c1 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
@@ -123,9 +123,17 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
val errs = new Err
withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
val resp: TopicMetadataResponse = consumer.send(req)
- // error codes here indicate missing / just created topic,
- // repeating on a different broker wont be useful
- return Right(resp.topicsMetadata.toSet)
+ val respErrs = resp.topicsMetadata.filter(m => m.errorCode != ErrorMapping.NoError)
+
+ if (respErrs.isEmpty) {
+ return Right(resp.topicsMetadata.toSet)
+ } else {
+ respErrs.foreach { m =>
+ val cause = ErrorMapping.exceptionFor(m.errorCode)
+ val msg = s"Error getting partition metadata for '${m.topic}'. Does the topic exist?"
+ errs.append(new SparkException(msg, cause))
+ }
+ }
}
Left(errs)
}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
index 2b33d2a220..7fb841b79c 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
@@ -52,6 +52,9 @@ class KafkaClusterSuite extends FunSuite with BeforeAndAfterAll {
val parts = kc.getPartitions(Set(topic)).right.get
assert(parts(topicAndPartition), "didn't get partitions")
+
+ val err = kc.getPartitions(Set(topic + "BAD"))
+ assert(err.isLeft, "getPartitions for a nonexistant topic should be an error")
}
test("leader offset apis") {