aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorjerluc <jeremyalucas@gmail.com>2015-05-18 18:13:29 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-05-18 18:13:29 -0700
commit0a7a94eab5fba3d2f2ef14a70c2c1bf4ee21b626 (patch)
tree10a13e9ff68d6dfa1f4d49ee428ccf6b28717383 /external
parent4fb52f9545ae338fae2d3aeea4bfc35d5df44853 (diff)
downloadspark-0a7a94eab5fba3d2f2ef14a70c2c1bf4ee21b626.tar.gz
spark-0a7a94eab5fba3d2f2ef14a70c2c1bf4ee21b626.tar.bz2
spark-0a7a94eab5fba3d2f2ef14a70c2c1bf4ee21b626.zip
[SPARK-7621] [STREAMING] Report Kafka errors to StreamingListeners
PR per [SPARK-7621](https://issues.apache.org/jira/browse/SPARK-7621), which makes both `KafkaReceiver` and `ReliableKafkaReceiver` report its errors to the `ReceiverTracker`, which in turn will add the events to the bus to fire off any registered `StreamingListener`s. Author: jerluc <jeremyalucas@gmail.com> Closes #6204 from jerluc/master and squashes the following commits: 82439a5 [jerluc] [SPARK-7621] [STREAMING] Report Kafka errors to StreamingListeners
Diffstat (limited to 'external')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala2
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala2
2 files changed, 2 insertions, 2 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index cca0fac023..04b2dc10d3 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -135,7 +135,7 @@ class KafkaReceiver[
store((msgAndMetadata.key, msgAndMetadata.message))
}
} catch {
- case e: Throwable => logError("Error handling message; exiting", e)
+ case e: Throwable => reportError("Error handling message; exiting", e)
}
}
}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
index ea87e96037..75f0dfc22b 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
@@ -267,7 +267,7 @@ class ReliableKafkaReceiver[
}
} catch {
case e: Exception =>
- logError("Error handling message", e)
+ reportError("Error handling message", e)
}
}
}