aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org
diff options
context:
space:
mode:
authorErgin Seyfe <eseyfe@fb.com>2016-12-08 10:21:09 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2016-12-08 10:21:09 -0800
commited8869ebbf39783b16daba2e2498a2bc1889306f (patch)
tree91ab01fcb7002ca98684ddc28ffa2e4d1e32d3b1 /core/src/test/scala/org
parentb44d1b8fcf00b238df434cf70ad09460b27adf07 (diff)
downloadspark-ed8869ebbf39783b16daba2e2498a2bc1889306f.tar.gz
spark-ed8869ebbf39783b16daba2e2498a2bc1889306f.tar.bz2
spark-ed8869ebbf39783b16daba2e2498a2bc1889306f.zip
[SPARK-8617][WEBUI] HistoryServer: Include in-progress files during cleanup
## What changes were proposed in this pull request? - Removed the`attempt.completed ` filter so cleaner would include the orphan inprogress files. - Use loading time for inprogress files as lastUpdated. Keep using the modTime for completed files. First one will prevent deletion of inprogress job files. Second one will ensure that lastUpdated time won't change for completed jobs in an event of HistoryServer reboot. ## How was this patch tested? Added new unittests and via existing tests. Author: Ergin Seyfe <eseyfe@fb.com> Closes #16165 from seyfe/clear_old_inprogress_files.
Diffstat (limited to 'core/src/test/scala/org')
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala50
1 files changed, 48 insertions, 2 deletions
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 2c41c432d1..027f412c75 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -66,7 +66,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
}
test("Parse application logs") {
- val provider = new FsHistoryProvider(createTestConf())
+ val clock = new ManualClock(12345678)
+ val provider = new FsHistoryProvider(createTestConf(), clock)
// Write a new-style application log.
val newAppComplete = newLogFile("new1", None, inProgress = false)
@@ -109,12 +110,15 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed)))
}
+ // For completed files, lastUpdated would be lastModified time.
list(0) should be (makeAppInfo("new-app-complete", newAppComplete.getName(), 1L, 5L,
newAppComplete.lastModified(), "test", true))
list(1) should be (makeAppInfo("new-complete-lzf", newAppCompressedComplete.getName(),
1L, 4L, newAppCompressedComplete.lastModified(), "test", true))
+
+ // For Inprogress files, lastUpdated would be current loading time.
list(2) should be (makeAppInfo("new-incomplete", newAppIncomplete.getName(), 1L, -1L,
- newAppIncomplete.lastModified(), "test", false))
+ clock.getTimeMillis(), "test", false))
// Make sure the UI can be rendered.
list.foreach { case info =>
@@ -299,6 +303,48 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
assert(!log2.exists())
}
+ test("log cleaner for inProgress files") {
+ val firstFileModifiedTime = TimeUnit.SECONDS.toMillis(10)
+ val secondFileModifiedTime = TimeUnit.SECONDS.toMillis(20)
+ val maxAge = TimeUnit.SECONDS.toMillis(40)
+ val clock = new ManualClock(0)
+ val provider = new FsHistoryProvider(
+ createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock)
+
+ val log1 = newLogFile("inProgressApp1", None, inProgress = true)
+ writeFile(log1, true, None,
+ SparkListenerApplicationStart(
+ "inProgressApp1", Some("inProgressApp1"), 3L, "test", Some("attempt1"))
+ )
+
+ clock.setTime(firstFileModifiedTime)
+ provider.checkForLogs()
+
+ val log2 = newLogFile("inProgressApp2", None, inProgress = true)
+ writeFile(log2, true, None,
+ SparkListenerApplicationStart(
+ "inProgressApp2", Some("inProgressApp2"), 23L, "test2", Some("attempt2"))
+ )
+
+ clock.setTime(secondFileModifiedTime)
+ provider.checkForLogs()
+
+ // This should not trigger any cleanup
+ updateAndCheck(provider)(list => list.size should be(2))
+
+ // Should trigger cleanup for first file but not second one
+ clock.setTime(firstFileModifiedTime + maxAge + 1)
+ updateAndCheck(provider)(list => list.size should be(1))
+ assert(!log1.exists())
+ assert(log2.exists())
+
+ // Should cleanup the second file as well.
+ clock.setTime(secondFileModifiedTime + maxAge + 1)
+ updateAndCheck(provider)(list => list.size should be(0))
+ assert(!log1.exists())
+ assert(!log2.exists())
+ }
+
test("Event log copy") {
val provider = new FsHistoryProvider(createTestConf())
val logs = (1 to 2).map { i =>