aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-02-06 14:23:09 -0800
committerAndrew Or <andrew@databricks.com>2015-02-06 14:23:09 -0800
commit5687bab8fdfdc5345b8c5b9be8d4595299005fc8 (patch)
treed93f9dcfe3fe869b259ae4b7a31240024f9b5b00 /core
parentca66159a4f30d65fa4cd32dbf3ff23978cb7f99b (diff)
downloadspark-5687bab8fdfdc5345b8c5b9be8d4595299005fc8.tar.gz
spark-5687bab8fdfdc5345b8c5b9be8d4595299005fc8.tar.bz2
spark-5687bab8fdfdc5345b8c5b9be8d4595299005fc8.zip
[SPARK-5600] [core] Clean up FsHistoryProvider test, fix app sort order.
Clean up some test setup code to remove duplicate instantiation of the provider. Also make sure unfinished apps are sorted correctly. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #4370 from vanzin/SPARK-5600 and squashes the following commits: 0d048d5 [Marcelo Vanzin] Cleanup test code a bit. 2585119 [Marcelo Vanzin] Review feedback. 8b97544 [Marcelo Vanzin] Merge branch 'master' into SPARK-5600 be979e9 [Marcelo Vanzin] Merge branch 'master' into SPARK-5600 298371c [Marcelo Vanzin] [SPARK-5600] [core] Clean up FsHistoryProvider test, fix app sort order.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala15
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala85
2 files changed, 52 insertions, 48 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 92125f2df7..868c63d30a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -194,7 +194,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
None
}
}
- .sortBy { info => (-info.endTime, -info.startTime) }
+ .sortWith(compareAppInfo)
lastModifiedTime = newLastModifiedTime
@@ -214,7 +214,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
val newIterator = logInfos.iterator.buffered
val oldIterator = applications.values.iterator.buffered
while (newIterator.hasNext && oldIterator.hasNext) {
- if (newIterator.head.endTime > oldIterator.head.endTime) {
+ if (compareAppInfo(newIterator.head, oldIterator.head)) {
addIfAbsent(newIterator.next)
} else {
addIfAbsent(oldIterator.next)
@@ -231,6 +231,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
}
/**
+ * Comparison function that defines the sort order for the application listing.
+ *
+ * @return Whether `i1` should precede `i2`.
+ */
+ private def compareAppInfo(
+ i1: FsApplicationHistoryInfo,
+ i2: FsApplicationHistoryInfo): Boolean = {
+ if (i1.endTime != i2.endTime) i1.endTime >= i2.endTime else i1.startTime >= i2.startTime
+ }
+
+ /**
* Replays the events in the specified log file and returns information about the associated
* application.
*/
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 1d95432258..85939eaadc 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -37,13 +37,8 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
private var testDir: File = null
- private var provider: FsHistoryProvider = null
-
before {
testDir = Utils.createTempDir()
- provider = new FsHistoryProvider(new SparkConf()
- .set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
- .set("spark.history.fs.updateInterval", "0"))
}
after {
@@ -51,40 +46,41 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
}
test("Parse new and old application logs") {
- val conf = new SparkConf()
- .set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
- .set("spark.history.fs.updateInterval", "0")
- val provider = new FsHistoryProvider(conf)
+ val provider = new FsHistoryProvider(createTestConf())
// Write a new-style application log.
- val logFile1 = new File(testDir, "new1")
- writeFile(logFile1, true, None,
- SparkListenerApplicationStart("app1-1", None, 1L, "test"),
- SparkListenerApplicationEnd(2L)
+ val newAppComplete = new File(testDir, "new1")
+ writeFile(newAppComplete, true, None,
+ SparkListenerApplicationStart("new-app-complete", None, 1L, "test"),
+ SparkListenerApplicationEnd(4L)
)
// Write an unfinished app, new-style.
- val logFile2 = new File(testDir, "new2" + EventLoggingListener.IN_PROGRESS)
- writeFile(logFile2, true, None,
- SparkListenerApplicationStart("app2-2", None, 1L, "test")
+ val newAppIncomplete = new File(testDir, "new2" + EventLoggingListener.IN_PROGRESS)
+ writeFile(newAppIncomplete, true, None,
+ SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test")
)
// Write an old-style application log.
- val oldLog = new File(testDir, "old1")
- oldLog.mkdir()
- createEmptyFile(new File(oldLog, provider.SPARK_VERSION_PREFIX + "1.0"))
- writeFile(new File(oldLog, provider.LOG_PREFIX + "1"), false, None,
- SparkListenerApplicationStart("app3", None, 2L, "test"),
+ val oldAppComplete = new File(testDir, "old1")
+ oldAppComplete.mkdir()
+ createEmptyFile(new File(oldAppComplete, provider.SPARK_VERSION_PREFIX + "1.0"))
+ writeFile(new File(oldAppComplete, provider.LOG_PREFIX + "1"), false, None,
+ SparkListenerApplicationStart("old-app-complete", None, 2L, "test"),
SparkListenerApplicationEnd(3L)
)
- createEmptyFile(new File(oldLog, provider.APPLICATION_COMPLETE))
+ createEmptyFile(new File(oldAppComplete, provider.APPLICATION_COMPLETE))
+
+ // Check for logs so that we force the older unfinished app to be loaded, to make
+ // sure unfinished apps are also sorted correctly.
+ provider.checkForLogs()
// Write an unfinished app, old-style.
- val oldLog2 = new File(testDir, "old2")
- oldLog2.mkdir()
- createEmptyFile(new File(oldLog2, provider.SPARK_VERSION_PREFIX + "1.0"))
- writeFile(new File(oldLog2, provider.LOG_PREFIX + "1"), false, None,
- SparkListenerApplicationStart("app4", None, 2L, "test")
+ val oldAppIncomplete = new File(testDir, "old2")
+ oldAppIncomplete.mkdir()
+ createEmptyFile(new File(oldAppIncomplete, provider.SPARK_VERSION_PREFIX + "1.0"))
+ writeFile(new File(oldAppIncomplete, provider.LOG_PREFIX + "1"), false, None,
+ SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test")
)
// Force a reload of data from the log directory, and check that both logs are loaded.
@@ -96,14 +92,14 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
list.size should be (4)
list.count(e => e.completed) should be (2)
- list(0) should be (ApplicationHistoryInfo(oldLog.getName(), "app3", 2L, 3L,
- oldLog.lastModified(), "test", true))
- list(1) should be (ApplicationHistoryInfo(logFile1.getName(), "app1-1", 1L, 2L,
- logFile1.lastModified(), "test", true))
- list(2) should be (ApplicationHistoryInfo(oldLog2.getName(), "app4", 2L, -1L,
- oldLog2.lastModified(), "test", false))
- list(3) should be (ApplicationHistoryInfo(logFile2.getName(), "app2-2", 1L, -1L,
- logFile2.lastModified(), "test", false))
+ list(0) should be (ApplicationHistoryInfo(newAppComplete.getName(), "new-app-complete", 1L, 4L,
+ newAppComplete.lastModified(), "test", true))
+ list(1) should be (ApplicationHistoryInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
+ oldAppComplete.lastModified(), "test", true))
+ list(2) should be (ApplicationHistoryInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L,
+ -1L, oldAppIncomplete.lastModified(), "test", false))
+ list(3) should be (ApplicationHistoryInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L,
+ -1L, newAppIncomplete.lastModified(), "test", false))
// Make sure the UI can be rendered.
list.foreach { case info =>
@@ -113,6 +109,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
}
test("Parse legacy logs with compression codec set") {
+ val provider = new FsHistoryProvider(createTestConf())
val testCodecs = List((classOf[LZFCompressionCodec].getName(), true),
(classOf[SnappyCompressionCodec].getName(), true),
("invalid.codec", false))
@@ -156,10 +153,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
)
logFile2.setReadable(false, false)
- val conf = new SparkConf()
- .set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
- .set("spark.history.fs.updateInterval", "0")
- val provider = new FsHistoryProvider(conf)
+ val provider = new FsHistoryProvider(createTestConf())
provider.checkForLogs()
val list = provider.getListing().toSeq
@@ -168,10 +162,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
}
test("history file is renamed from inprogress to completed") {
- val conf = new SparkConf()
- .set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
- .set("spark.testing", "true")
- val provider = new FsHistoryProvider(conf)
+ val provider = new FsHistoryProvider(createTestConf())
val logFile1 = new File(testDir, "app1" + EventLoggingListener.IN_PROGRESS)
writeFile(logFile1, true, None,
@@ -191,9 +182,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
}
test("SPARK-5582: empty log directory") {
- val conf = new SparkConf()
- .set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
- val provider = new FsHistoryProvider(conf)
+ val provider = new FsHistoryProvider(createTestConf())
val logFile1 = new File(testDir, "app1" + EventLoggingListener.IN_PROGRESS)
writeFile(logFile1, true, None,
@@ -229,4 +218,8 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
new FileOutputStream(file).close()
}
+ private def createTestConf(): SparkConf = {
+ new SparkConf().set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
+ }
+
}