diff options
author | Josh Rosen <joshrosen@databricks.com> | 2016-11-01 16:49:41 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-11-01 16:49:41 -0700 |
commit | b929537b6eb0f8f34497c3dbceea8045bf5dffdb (patch) | |
tree | c1c2375788a771fdc7e4117d2b65ac6067e06a32 | |
parent | 6e6298154aba63831a292117797798131a646869 (diff) | |
download | spark-b929537b6eb0f8f34497c3dbceea8045bf5dffdb.tar.gz spark-b929537b6eb0f8f34497c3dbceea8045bf5dffdb.tar.bz2 spark-b929537b6eb0f8f34497c3dbceea8045bf5dffdb.zip |
[SPARK-18182] Expose ReplayListenerBus.read() overload which takes string iterator
The `ReplayListenerBus.read()` method is used when implementing a custom `ApplicationHistoryProvider`. The current interface only exposes a `read()` method which takes an `InputStream` and performs stream-to-lines conversion itself, but it would also be useful to expose an overloaded method which accepts an iterator of strings, thereby enabling events to be provided from non-`InputStream` sources.
Author: Josh Rosen <joshrosen@databricks.com>
Closes #15698 from JoshRosen/replay-listener-bus-interface.
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala | 15 |
1 files changed, 13 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index 2424586431..0bd5a6bc59 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -53,13 +53,24 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { sourceName: String, maybeTruncated: Boolean = false, eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = { + val lines = Source.fromInputStream(logData).getLines() + replay(lines, sourceName, maybeTruncated, eventsFilter) + } + /** + * Overloaded variant of [[replay()]] which accepts an iterator of lines instead of an + * [[InputStream]]. Exposed for use by custom ApplicationHistoryProvider implementations. + */ + def replay( + lines: Iterator[String], + sourceName: String, + maybeTruncated: Boolean, + eventsFilter: ReplayEventsFilter): Unit = { var currentLine: String = null var lineNumber: Int = 0 try { - val lineEntries = Source.fromInputStream(logData) - .getLines() + val lineEntries = lines .zipWithIndex .filter { case (line, _) => eventsFilter(line) } |