aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala22
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala18
2 files changed, 32 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 0ae45f4ad9..92125f2df7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -173,9 +173,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
val logInfos = statusList
.filter { entry =>
try {
- val modTime = getModificationTime(entry)
- newLastModifiedTime = math.max(newLastModifiedTime, modTime)
- modTime >= lastModifiedTime
+ getModificationTime(entry).map { time =>
+ newLastModifiedTime = math.max(newLastModifiedTime, time)
+ time >= lastModifiedTime
+ }.getOrElse(false)
} catch {
case e: AccessControlException =>
// Do not use "logInfo" since these messages can get pretty noisy if printed on
@@ -251,7 +252,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
appListener.appName.getOrElse(NOT_STARTED),
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
- getModificationTime(eventLog),
+ getModificationTime(eventLog).get,
appListener.sparkUser.getOrElse(NOT_STARTED),
isApplicationCompleted(eventLog))
} finally {
@@ -310,11 +311,16 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
*/
private def isLegacyLogDirectory(entry: FileStatus): Boolean = entry.isDir()
- private def getModificationTime(fsEntry: FileStatus): Long = {
- if (fsEntry.isDir) {
- fs.listStatus(fsEntry.getPath).map(_.getModificationTime()).max
+ /**
+ * Returns the modification time of the given event log. If the status points at an empty
+ * directory, `None` is returned, indicating that there isn't an event log at that location.
+ */
+ private def getModificationTime(fsEntry: FileStatus): Option[Long] = {
+ if (isLegacyLogDirectory(fsEntry)) {
+ val statusList = fs.listStatus(fsEntry.getPath)
+ if (!statusList.isEmpty) Some(statusList.map(_.getModificationTime()).max) else None
} else {
- fsEntry.getModificationTime()
+ Some(fsEntry.getModificationTime())
}
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 3fbc1a21d1..1d95432258 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -190,6 +190,24 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
appListAfterRename.head.logPath should not endWith(EventLoggingListener.IN_PROGRESS)
}
+ test("SPARK-5582: empty log directory") {
+ val conf = new SparkConf()
+ .set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
+ val provider = new FsHistoryProvider(conf)
+
+ val logFile1 = new File(testDir, "app1" + EventLoggingListener.IN_PROGRESS)
+ writeFile(logFile1, true, None,
+ SparkListenerApplicationStart("app1", Some("app1"), 1L, "test"),
+ SparkListenerApplicationEnd(2L))
+
+ val oldLog = new File(testDir, "old1")
+ oldLog.mkdir()
+
+ provider.checkForLogs()
+ val appListAfterRename = provider.getListing()
+ appListAfterRename.size should be (1)
+ }
+
private def writeFile(file: File, isNewFormat: Boolean, codec: Option[CompressionCodec],
events: SparkListenerEvent*) = {
val out =