aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-11-01 16:49:41 -0700
committerReynold Xin <rxin@databricks.com>2016-11-01 16:49:41 -0700
commitb929537b6eb0f8f34497c3dbceea8045bf5dffdb (patch)
treec1c2375788a771fdc7e4117d2b65ac6067e06a32
parent6e6298154aba63831a292117797798131a646869 (diff)
downloadspark-b929537b6eb0f8f34497c3dbceea8045bf5dffdb.tar.gz
spark-b929537b6eb0f8f34497c3dbceea8045bf5dffdb.tar.bz2
spark-b929537b6eb0f8f34497c3dbceea8045bf5dffdb.zip
[SPARK-18182] Expose ReplayListenerBus.read() overload which takes string iterator
The `ReplayListenerBus.read()` method is used when implementing a custom `ApplicationHistoryProvider`. The current interface only exposes a `read()` method which takes an `InputStream` and performs stream-to-lines conversion itself, but it would also be useful to expose an overloaded method which accepts an iterator of strings, thereby enabling events to be provided from non-`InputStream` sources. Author: Josh Rosen <joshrosen@databricks.com> Closes #15698 from JoshRosen/replay-listener-bus-interface.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala15
1 files changed, 13 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index 2424586431..0bd5a6bc59 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -53,13 +53,24 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
sourceName: String,
maybeTruncated: Boolean = false,
eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = {
+ val lines = Source.fromInputStream(logData).getLines()
+ replay(lines, sourceName, maybeTruncated, eventsFilter)
+ }
+ /**
+ * Overloaded variant of [[replay()]] which accepts an iterator of lines instead of an
+ * [[InputStream]]. Exposed for use by custom ApplicationHistoryProvider implementations.
+ */
+ def replay(
+ lines: Iterator[String],
+ sourceName: String,
+ maybeTruncated: Boolean,
+ eventsFilter: ReplayEventsFilter): Unit = {
var currentLine: String = null
var lineNumber: Int = 0
try {
- val lineEntries = Source.fromInputStream(logData)
- .getLines()
+ val lineEntries = lines
.zipWithIndex
.filter { case (line, _) => eventsFilter(line) }