aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorChuan Shao <shaochuan@huawei.com>2015-09-02 11:02:27 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2015-09-02 11:02:57 -0700
commitc3b881a7d7e4736f7131ff002a80e25def1f63af (patch)
treed5f2b45963eb4f43ed15bb01f748217b192db0f5 /core
parent00d9af5e190475affffb8b50467fcddfc40f50dc (diff)
downloadspark-c3b881a7d7e4736f7131ff002a80e25def1f63af.tar.gz
spark-c3b881a7d7e4736f7131ff002a80e25def1f63af.tar.bz2
spark-c3b881a7d7e4736f7131ff002a80e25def1f63af.zip
[SPARK-7336] [HISTORYSERVER] Fix bug that applications status incorrect on JobHistory UI.
Author: ArcherShao <shaochuan@huawei.com> Closes #5886 from ArcherShao/SPARK-7336.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala27
1 files changed, 22 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index e573ff16c5..a5755eac36 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -18,6 +18,7 @@
package org.apache.spark.deploy.history
import java.io.{BufferedInputStream, FileNotFoundException, InputStream, IOException, OutputStream}
+import java.util.UUID
import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
import java.util.zip.{ZipEntry, ZipOutputStream}
@@ -73,7 +74,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// The modification time of the newest log detected during the last scan. This is used
// to ignore logs that are older during subsequent scans, to avoid processing data that
// is already known.
- private var lastModifiedTime = -1L
+ private var lastScanTime = -1L
// Mapping of application IDs to their metadata, in descending end time order. Apps are inserted
// into the map in order, so the LinkedHashMap maintains the correct ordering.
@@ -179,15 +180,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
*/
private[history] def checkForLogs(): Unit = {
try {
+ val newLastScanTime = getNewLastScanTime()
val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
.getOrElse(Seq[FileStatus]())
- var newLastModifiedTime = lastModifiedTime
val logInfos: Seq[FileStatus] = statusList
.filter { entry =>
try {
getModificationTime(entry).map { time =>
- newLastModifiedTime = math.max(newLastModifiedTime, time)
- time >= lastModifiedTime
+ time >= lastScanTime
}.getOrElse(false)
} catch {
case e: AccessControlException =>
@@ -224,12 +224,29 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}
- lastModifiedTime = newLastModifiedTime
+ lastScanTime = newLastScanTime
} catch {
case e: Exception => logError("Exception in checking for event log updates", e)
}
}
+ private def getNewLastScanTime(): Long = {
+ val fileName = "." + UUID.randomUUID().toString
+ val path = new Path(logDir, fileName)
+ val fos = fs.create(path)
+
+ try {
+ fos.close()
+ fs.getFileStatus(path).getModificationTime
+ } catch {
+ case e: Exception =>
+ logError("Exception encountered when attempting to update last scan time", e)
+ lastScanTime
+ } finally {
+ fs.delete(path)
+ }
+ }
+
override def writeEventLogs(
appId: String,
attemptId: Option[String],