aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-02-06 10:07:20 +0000
committerJosh Rosen <joshrosen@databricks.com>2015-02-06 10:43:34 -0800
commitfaccdcbc2c49c3e87b8d51ae571de245e2cdc80b (patch)
tree49dc51ac3868e07071ec2b6cbc9989dd79d4bc51 /core
parent25d80444c4e388457bbaa0faa25ac8872ac4e5fe (diff)
downloadspark-faccdcbc2c49c3e87b8d51ae571de245e2cdc80b.tar.gz
spark-faccdcbc2c49c3e87b8d51ae571de245e2cdc80b.tar.bz2
spark-faccdcbc2c49c3e87b8d51ae571de245e2cdc80b.zip
[SPARK-5582] [history] Ignore empty log directories.
Empty log directories are not useful at the moment, but if one ends up showing in the log root, it breaks the code that checks for log directories. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #4352 from vanzin/SPARK-5582 and squashes the following commits: 1a6a3d4 [Marcelo Vanzin] [SPARK-5582] Fix exception when looking at empty directories.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala22
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala18
2 files changed, 32 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 0ae45f4ad9..92125f2df7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -173,9 +173,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
val logInfos = statusList
.filter { entry =>
try {
- val modTime = getModificationTime(entry)
- newLastModifiedTime = math.max(newLastModifiedTime, modTime)
- modTime >= lastModifiedTime
+ getModificationTime(entry).map { time =>
+ newLastModifiedTime = math.max(newLastModifiedTime, time)
+ time >= lastModifiedTime
+ }.getOrElse(false)
} catch {
case e: AccessControlException =>
// Do not use "logInfo" since these messages can get pretty noisy if printed on
@@ -251,7 +252,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
appListener.appName.getOrElse(NOT_STARTED),
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
- getModificationTime(eventLog),
+ getModificationTime(eventLog).get,
appListener.sparkUser.getOrElse(NOT_STARTED),
isApplicationCompleted(eventLog))
} finally {
@@ -310,11 +311,16 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
*/
private def isLegacyLogDirectory(entry: FileStatus): Boolean = entry.isDir()
- private def getModificationTime(fsEntry: FileStatus): Long = {
- if (fsEntry.isDir) {
- fs.listStatus(fsEntry.getPath).map(_.getModificationTime()).max
+ /**
+ * Returns the modification time of the given event log. If the status points at an empty
+ * directory, `None` is returned, indicating that there isn't an event log at that location.
+ */
+ private def getModificationTime(fsEntry: FileStatus): Option[Long] = {
+ if (isLegacyLogDirectory(fsEntry)) {
+ val statusList = fs.listStatus(fsEntry.getPath)
+ if (!statusList.isEmpty) Some(statusList.map(_.getModificationTime()).max) else None
} else {
- fsEntry.getModificationTime()
+ Some(fsEntry.getModificationTime())
}
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 3fbc1a21d1..1d95432258 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -190,6 +190,24 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
appListAfterRename.head.logPath should not endWith(EventLoggingListener.IN_PROGRESS)
}
+ test("SPARK-5582: empty log directory") {
+ val conf = new SparkConf()
+ .set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
+ val provider = new FsHistoryProvider(conf)
+
+ val logFile1 = new File(testDir, "app1" + EventLoggingListener.IN_PROGRESS)
+ writeFile(logFile1, true, None,
+ SparkListenerApplicationStart("app1", Some("app1"), 1L, "test"),
+ SparkListenerApplicationEnd(2L))
+
+ val oldLog = new File(testDir, "old1")
+ oldLog.mkdir()
+
+ provider.checkForLogs()
+ val appListAfterRename = provider.getListing()
+ appListAfterRename.size should be (1)
+ }
+
private def writeFile(file: File, isNewFormat: Boolean, codec: Option[CompressionCodec],
events: SparkListenerEvent*) = {
val out =