aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRyan Williams <ryan.blake.williams@gmail.com>2015-02-13 09:47:26 -0800
committerAndrew Or <andrew@databricks.com>2015-02-13 09:47:26 -0800
commitfc6d3e796a3c600e2f7827562455d555e59775ae (patch)
tree08f6c045a6c7d77886db38f42054e7829243eeaf
parente1a1ff8108463ca79299ec0eb555a0c8db9dffa0 (diff)
downloadspark-fc6d3e796a3c600e2f7827562455d555e59775ae.tar.gz
spark-fc6d3e796a3c600e2f7827562455d555e59775ae.tar.bz2
spark-fc6d3e796a3c600e2f7827562455d555e59775ae.zip
[SPARK-5783] Better eventlog-parsing error messages
Author: Ryan Williams <ryan.blake.williams@gmail.com> Closes #4573 from ryan-williams/history and squashes the following commits: a8647ec [Ryan Williams] fix test calls to .replay() 98aa3fe [Ryan Williams] include filename in history-parsing error message 8deecf0 [Ryan Williams] add line number to history-parsing error message b668b52 [Ryan Williams] add log info line to history-eventlog parsing
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala4
4 files changed, 11 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 868c63d30a..885fa0fdbf 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -247,6 +247,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
*/
private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationHistoryInfo = {
val logPath = eventLog.getPath()
+ logInfo(s"Replaying log path: $logPath")
val (logInput, sparkVersion) =
if (isLegacyLogDirectory(eventLog)) {
openLegacyEventLog(logPath)
@@ -256,7 +257,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
try {
val appListener = new ApplicationEventListener
bus.addListener(appListener)
- bus.replay(logInput, sparkVersion)
+ bus.replay(logInput, sparkVersion, logPath.toString)
new FsApplicationHistoryInfo(
logPath.getName(),
appListener.appId.getOrElse(logPath.getName()),
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 53e453990f..8cc6ec1e81 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -761,7 +761,7 @@ private[spark] class Master(
val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
appName + " (completed)", HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
try {
- replayBus.replay(logInput, sparkVersion)
+ replayBus.replay(logInput, sparkVersion, eventLogFile)
} finally {
logInput.close()
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index 584f4e7789..d9c3a10dc5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -40,21 +40,24 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
*
* @param logData Stream containing event log data.
* @param version Spark version that generated the events.
+ * @param sourceName Filename (or other source identifier) from whence @logData is being read
*/
- def replay(logData: InputStream, version: String) {
+ def replay(logData: InputStream, version: String, sourceName: String) {
var currentLine: String = null
+ var lineNumber: Int = 1
try {
val lines = Source.fromInputStream(logData).getLines()
lines.foreach { line =>
currentLine = line
postToAll(JsonProtocol.sparkEventFromJson(parse(line)))
+ lineNumber += 1
}
} catch {
case ioe: IOException =>
throw ioe
case e: Exception =>
- logError("Exception in parsing Spark event log.", e)
- logError("Malformed line: %s\n".format(currentLine))
+ logError(s"Exception parsing Spark event log: $sourceName", e)
+ logError(s"Malformed line #$lineNumber: $currentLine\n")
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index 7e360cc608..702c4cb3bd 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -61,7 +61,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
try {
val replayer = new ReplayListenerBus()
replayer.addListener(eventMonster)
- replayer.replay(logData, SPARK_VERSION)
+ replayer.replay(logData, SPARK_VERSION, logFilePath.toString)
} finally {
logData.close()
}
@@ -120,7 +120,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
try {
val replayer = new ReplayListenerBus()
replayer.addListener(eventMonster)
- replayer.replay(logData, version)
+ replayer.replay(logData, version, eventLog.getPath().toString)
} finally {
logData.close()
}