aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKousuke Saruta <sarutak@oss.nttdata.co.jp>2015-01-25 15:34:20 -0800
committerAndrew Or <andrew@databricks.com>2015-01-25 15:34:20 -0800
commit8f5c827b01026bf45fc774ed7387f11a941abea8 (patch)
treef6db03737fdf904b6a400b24116d78bb7573aab9
parent9f6435763d173d2abf82d16b5878983fa8bf3419 (diff)
downloadspark-8f5c827b01026bf45fc774ed7387f11a941abea8.tar.gz
spark-8f5c827b01026bf45fc774ed7387f11a941abea8.tar.bz2
spark-8f5c827b01026bf45fc774ed7387f11a941abea8.zip
[SPARK-5344][WebUI] HistoryServer cannot recognize that inprogress file was renamed to completed file
`FsHistoryProvider` tries to update application status but if `checkForLogs` is called before `.inprogress` file is renamed to completed file, the file is not recognized as completed. Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp> Closes #4132 from sarutak/SPARK-5344 and squashes the following commits: 9658008 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-5344 d2c72b6 [Kousuke Saruta] Fixed update issue of FsHistoryProvider
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala23
2 files changed, 26 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 2b084a2d73..0ae45f4ad9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -203,7 +203,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
if (!logInfos.isEmpty) {
val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
def addIfAbsent(info: FsApplicationHistoryInfo) = {
- if (!newApps.contains(info.id)) {
+ if (!newApps.contains(info.id) ||
+ newApps(info.id).logPath.endsWith(EventLoggingListener.IN_PROGRESS) &&
+ !info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) {
newApps += (info.id -> info)
}
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 8379883e06..3fbc1a21d1 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -167,6 +167,29 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
list.size should be (1)
}
+ test("history file is renamed from inprogress to completed") {
+ val conf = new SparkConf()
+ .set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
+ .set("spark.testing", "true")
+ val provider = new FsHistoryProvider(conf)
+
+ val logFile1 = new File(testDir, "app1" + EventLoggingListener.IN_PROGRESS)
+ writeFile(logFile1, true, None,
+ SparkListenerApplicationStart("app1", Some("app1"), 1L, "test"),
+ SparkListenerApplicationEnd(2L)
+ )
+ provider.checkForLogs()
+ val appListBeforeRename = provider.getListing()
+ appListBeforeRename.size should be (1)
+ appListBeforeRename.head.logPath should endWith(EventLoggingListener.IN_PROGRESS)
+
+ logFile1.renameTo(new File(testDir, "app1"))
+ provider.checkForLogs()
+ val appListAfterRename = provider.getListing()
+ appListAfterRename.size should be (1)
+ appListAfterRename.head.logPath should not endWith(EventLoggingListener.IN_PROGRESS)
+ }
+
private def writeFile(file: File, isNewFormat: Boolean, codec: Option[CompressionCodec],
events: SparkListenerEvent*) = {
val out =