aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala15
1 files changed, 13 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index 2424586431..0bd5a6bc59 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -53,13 +53,24 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
sourceName: String,
maybeTruncated: Boolean = false,
eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = {
+ val lines = Source.fromInputStream(logData).getLines()
+ replay(lines, sourceName, maybeTruncated, eventsFilter)
+ }
+ /**
+ * Overloaded variant of [[replay()]] which accepts an iterator of lines instead of an
+ * [[InputStream]]. Exposed for use by custom ApplicationHistoryProvider implementations.
+ */
+ def replay(
+ lines: Iterator[String],
+ sourceName: String,
+ maybeTruncated: Boolean,
+ eventsFilter: ReplayEventsFilter): Unit = {
var currentLine: String = null
var lineNumber: Int = 0
try {
- val lineEntries = Source.fromInputStream(logData)
- .getLines()
+ val lineEntries = lines
.zipWithIndex
.filter { case (line, _) => eventsFilter(line) }