aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-05-01 09:50:55 -0500
committerImran Rashid <irashid@cloudera.com>2015-05-01 09:50:55 -0500
commit3052f4916e7f2c7fbc4837f00f4463b7d0b34718 (patch)
tree60796c615223dd96109cfcee74e6978528539425 /core/src/test
parent7fe0f3f2b46c61a5cc4af9166781624409fda8a4 (diff)
downloadspark-3052f4916e7f2c7fbc4837f00f4463b7d0b34718.tar.gz
spark-3052f4916e7f2c7fbc4837f00f4463b7d0b34718.tar.bz2
spark-3052f4916e7f2c7fbc4837f00f4463b7d0b34718.zip
[SPARK-4705] Handle multiple app attempts event logs, history server.
This change modifies the event logging listener to write the logs for different application attempts to different files. The attempt ID is set by the scheduler backend, so as long as the backend returns that ID to SparkContext, things should work. Currently, the YARN backend does that. The history server was also modified to model multiple attempts per application. Each attempt has its own UI and a separate row in the listing table, so that users can look at all the attempts separately. The UI "adapts" itself to avoid showing attempt-specific info when all the applications being shown have a single attempt. Author: Marcelo Vanzin <vanzin@cloudera.com> Author: twinkle sachdeva <twinkle@kite.ggn.in.guavus.com> Author: twinkle.sachdeva <twinkle.sachdeva@guavus.com> Author: twinkle sachdeva <twinkle.sachdeva@guavus.com> Closes #5432 from vanzin/SPARK-4705 and squashes the following commits: 7e289fa [Marcelo Vanzin] Review feedback. f66dcc5 [Marcelo Vanzin] Merge branch 'master' into SPARK-4705 bc885b7 [Marcelo Vanzin] Review feedback. 76a3651 [Marcelo Vanzin] Fix log cleaner, add test. 7c381ec [Marcelo Vanzin] Merge branch 'master' into SPARK-4705 1aa309d [Marcelo Vanzin] Improve sorting of app attempts. 2ad77e7 [Marcelo Vanzin] Missed a reference to the old property name. 9d59d92 [Marcelo Vanzin] Scalastyle... d5a9c37 [Marcelo Vanzin] Update JsonProtocol test, make property name consistent. ba34b69 [Marcelo Vanzin] Use Option[String] for attempt id. f1cb9b3 [Marcelo Vanzin] Merge branch 'master' into SPARK-4705 c14ec19 [Marcelo Vanzin] Merge branch 'master' into SPARK-4705 9092d39 [Marcelo Vanzin] Merge branch 'master' into SPARK-4705 86de638 [Marcelo Vanzin] Merge branch 'master' into SPARK-4705 07446c6 [Marcelo Vanzin] Disable striping for app id / name when multiple attempts exist. 9092af5 [Marcelo Vanzin] Fix HistoryServer test. 3a14503 [Marcelo Vanzin] Argh scalastyle. 657ec18 [Marcelo Vanzin] Fix yarn history URL, app links. c3e0a82 [Marcelo Vanzin] Move app name to app info, more UI fixes. ce5ee5d [Marcelo Vanzin] Misc UI, test, style fixes. cbe8bba [Marcelo Vanzin] Attempt ID in listener event should be an option. 88b1de8 [Marcelo Vanzin] Add a test for apps with multiple attempts. 3245aa2 [Marcelo Vanzin] Make app attempts part of the history server model. 5fd5c6f [Marcelo Vanzin] Fix my broken rebase. 318525a [twinkle.sachdeva] SPARK-4705: 1) moved from directory structure to single file, as per the master branch. 2) Added the attempt id inside the SparkListenerApplicationStart, to make the info available independent of directory structure. 3) Changes in History Server to render the UI as per the snaphot II 6b2e521 [twinkle sachdeva] SPARK-4705 Incorporating the review comments regarding formatting, will do the rest of the changes after this 4c1fc26 [twinkle sachdeva] SPARK-4705 Incorporating the review comments regarding formatting, will do the rest of the changes after this 0eb7722 [twinkle sachdeva] SPARK-4705: Doing cherry-pick of fix into master
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala243
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala18
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala11
6 files changed, 212 insertions, 69 deletions
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 9e367a0d9a..a0a0afa488 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.history
import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStreamWriter}
import java.net.URI
+import java.util.concurrent.TimeUnit
import scala.io.Source
@@ -30,7 +31,7 @@ import org.scalatest.Matchers
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.io._
import org.apache.spark.scheduler._
-import org.apache.spark.util.{JsonProtocol, Utils}
+import org.apache.spark.util.{JsonProtocol, ManualClock, Utils}
class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers with Logging {
@@ -47,10 +48,11 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
/** Create a fake log file using the new log format used in Spark 1.3+ */
private def newLogFile(
appId: String,
+ appAttemptId: Option[String],
inProgress: Boolean,
codec: Option[String] = None): File = {
val ip = if (inProgress) EventLoggingListener.IN_PROGRESS else ""
- val logUri = EventLoggingListener.getLogPath(testDir.toURI, appId)
+ val logUri = EventLoggingListener.getLogPath(testDir.toURI, appId, appAttemptId)
val logPath = new URI(logUri).getPath + ip
new File(logPath)
}
@@ -59,22 +61,23 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
val provider = new FsHistoryProvider(createTestConf())
// Write a new-style application log.
- val newAppComplete = newLogFile("new1", inProgress = false)
+ val newAppComplete = newLogFile("new1", None, inProgress = false)
writeFile(newAppComplete, true, None,
- SparkListenerApplicationStart("new-app-complete", None, 1L, "test"),
+ SparkListenerApplicationStart("new-app-complete", None, 1L, "test", None),
SparkListenerApplicationEnd(5L)
)
// Write a new-style application log.
- val newAppCompressedComplete = newLogFile("new1compressed", inProgress = false, Some("lzf"))
+ val newAppCompressedComplete = newLogFile("new1compressed", None, inProgress = false,
+ Some("lzf"))
writeFile(newAppCompressedComplete, true, None,
- SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test"),
+ SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test", None),
SparkListenerApplicationEnd(4L))
// Write an unfinished app, new-style.
- val newAppIncomplete = newLogFile("new2", inProgress = true)
+ val newAppIncomplete = newLogFile("new2", None, inProgress = true)
writeFile(newAppIncomplete, true, None,
- SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test")
+ SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test", None)
)
// Write an old-style application log.
@@ -82,7 +85,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
oldAppComplete.mkdir()
createEmptyFile(new File(oldAppComplete, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(oldAppComplete, provider.LOG_PREFIX + "1"), false, None,
- SparkListenerApplicationStart("old-app-complete", None, 2L, "test"),
+ SparkListenerApplicationStart("old-app-complete", None, 2L, "test", None),
SparkListenerApplicationEnd(3L)
)
createEmptyFile(new File(oldAppComplete, provider.APPLICATION_COMPLETE))
@@ -96,33 +99,45 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
oldAppIncomplete.mkdir()
createEmptyFile(new File(oldAppIncomplete, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(oldAppIncomplete, provider.LOG_PREFIX + "1"), false, None,
- SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test")
+ SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test", None)
)
// Force a reload of data from the log directory, and check that both logs are loaded.
// Take the opportunity to check that the offset checks work as expected.
- provider.checkForLogs()
+ updateAndCheck(provider) { list =>
+ list.size should be (5)
+ list.count(_.attempts.head.completed) should be (3)
+
+ def makeAppInfo(
+ id: String,
+ name: String,
+ start: Long,
+ end: Long,
+ lastMod: Long,
+ user: String,
+ completed: Boolean): ApplicationHistoryInfo = {
+ ApplicationHistoryInfo(id, name,
+ List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed)))
+ }
- val list = provider.getListing().toSeq
- list should not be (null)
- list.size should be (5)
- list.count(_.completed) should be (3)
-
- list(0) should be (ApplicationHistoryInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L,
- newAppComplete.lastModified(), "test", true))
- list(1) should be (ApplicationHistoryInfo(newAppCompressedComplete.getName(),
- "new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test", true))
- list(2) should be (ApplicationHistoryInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
- oldAppComplete.lastModified(), "test", true))
- list(3) should be (ApplicationHistoryInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L,
- -1L, oldAppIncomplete.lastModified(), "test", false))
- list(4) should be (ApplicationHistoryInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L,
- -1L, newAppIncomplete.lastModified(), "test", false))
-
- // Make sure the UI can be rendered.
- list.foreach { case info =>
- val appUi = provider.getAppUI(info.id)
- appUi should not be null
+ list(0) should be (makeAppInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L,
+ newAppComplete.lastModified(), "test", true))
+ list(1) should be (makeAppInfo(newAppCompressedComplete.getName(),
+ "new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test",
+ true))
+ list(2) should be (makeAppInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
+ oldAppComplete.lastModified(), "test", true))
+ list(3) should be (makeAppInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L, -1L,
+ oldAppIncomplete.lastModified(), "test", false))
+ list(4) should be (makeAppInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L, -1L,
+ newAppIncomplete.lastModified(), "test", false))
+
+ // Make sure the UI can be rendered.
+ list.foreach { case info =>
+ val appUi = provider.getAppUI(info.id, None)
+ appUi should not be null
+ appUi should not be None
+ }
}
}
@@ -138,7 +153,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
logDir.mkdir()
createEmptyFile(new File(logDir, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(logDir, provider.LOG_PREFIX + "1"), false, Option(codec),
- SparkListenerApplicationStart("app2", None, 2L, "test"),
+ SparkListenerApplicationStart("app2", None, 2L, "test", None),
SparkListenerApplicationEnd(3L)
)
createEmptyFile(new File(logDir, provider.COMPRESSION_CODEC_PREFIX + codecName))
@@ -159,52 +174,52 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
}
test("SPARK-3697: ignore directories that cannot be read.") {
- val logFile1 = newLogFile("new1", inProgress = false)
+ val logFile1 = newLogFile("new1", None, inProgress = false)
writeFile(logFile1, true, None,
- SparkListenerApplicationStart("app1-1", None, 1L, "test"),
+ SparkListenerApplicationStart("app1-1", None, 1L, "test", None),
SparkListenerApplicationEnd(2L)
)
- val logFile2 = newLogFile("new2", inProgress = false)
+ val logFile2 = newLogFile("new2", None, inProgress = false)
writeFile(logFile2, true, None,
- SparkListenerApplicationStart("app1-2", None, 1L, "test"),
+ SparkListenerApplicationStart("app1-2", None, 1L, "test", None),
SparkListenerApplicationEnd(2L)
)
logFile2.setReadable(false, false)
val provider = new FsHistoryProvider(createTestConf())
- provider.checkForLogs()
-
- val list = provider.getListing().toSeq
- list should not be (null)
- list.size should be (1)
+ updateAndCheck(provider) { list =>
+ list.size should be (1)
+ }
}
test("history file is renamed from inprogress to completed") {
val provider = new FsHistoryProvider(createTestConf())
- val logFile1 = newLogFile("app1", inProgress = true)
+ val logFile1 = newLogFile("app1", None, inProgress = true)
writeFile(logFile1, true, None,
- SparkListenerApplicationStart("app1", Some("app1"), 1L, "test"),
+ SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", None),
SparkListenerApplicationEnd(2L)
)
- provider.checkForLogs()
- val appListBeforeRename = provider.getListing()
- appListBeforeRename.size should be (1)
- appListBeforeRename.head.logPath should endWith(EventLoggingListener.IN_PROGRESS)
+ updateAndCheck(provider) { list =>
+ list.size should be (1)
+ list.head.attempts.head.asInstanceOf[FsApplicationAttemptInfo].logPath should
+ endWith(EventLoggingListener.IN_PROGRESS)
+ }
- logFile1.renameTo(newLogFile("app1", inProgress = false))
- provider.checkForLogs()
- val appListAfterRename = provider.getListing()
- appListAfterRename.size should be (1)
- appListAfterRename.head.logPath should not endWith(EventLoggingListener.IN_PROGRESS)
+ logFile1.renameTo(newLogFile("app1", None, inProgress = false))
+ updateAndCheck(provider) { list =>
+ list.size should be (1)
+ list.head.attempts.head.asInstanceOf[FsApplicationAttemptInfo].logPath should not
+ endWith(EventLoggingListener.IN_PROGRESS)
+ }
}
test("SPARK-5582: empty log directory") {
val provider = new FsHistoryProvider(createTestConf())
- val logFile1 = newLogFile("app1", inProgress = true)
+ val logFile1 = newLogFile("app1", None, inProgress = true)
writeFile(logFile1, true, None,
- SparkListenerApplicationStart("app1", Some("app1"), 1L, "test"),
+ SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", None),
SparkListenerApplicationEnd(2L))
val oldLog = new File(testDir, "old1")
@@ -215,6 +230,126 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
appListAfterRename.size should be (1)
}
+ test("apps with multiple attempts") {
+ val provider = new FsHistoryProvider(createTestConf())
+
+ val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = false)
+ writeFile(attempt1, true, None,
+ SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")),
+ SparkListenerApplicationEnd(2L)
+ )
+
+ updateAndCheck(provider) { list =>
+ list.size should be (1)
+ list.head.attempts.size should be (1)
+ }
+
+ val attempt2 = newLogFile("app1", Some("attempt2"), inProgress = true)
+ writeFile(attempt2, true, None,
+ SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2"))
+ )
+
+ updateAndCheck(provider) { list =>
+ list.size should be (1)
+ list.head.attempts.size should be (2)
+ list.head.attempts.head.attemptId should be (Some("attempt2"))
+ }
+
+ val completedAttempt2 = newLogFile("app1", Some("attempt2"), inProgress = false)
+ attempt2.delete()
+ writeFile(attempt2, true, None,
+ SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2")),
+ SparkListenerApplicationEnd(4L)
+ )
+
+ updateAndCheck(provider) { list =>
+ list should not be (null)
+ list.size should be (1)
+ list.head.attempts.size should be (2)
+ list.head.attempts.head.attemptId should be (Some("attempt2"))
+ }
+
+ val app2Attempt1 = newLogFile("app2", Some("attempt1"), inProgress = false)
+ writeFile(attempt2, true, None,
+ SparkListenerApplicationStart("app2", Some("app2"), 5L, "test", Some("attempt1")),
+ SparkListenerApplicationEnd(6L)
+ )
+
+ updateAndCheck(provider) { list =>
+ list.size should be (2)
+ list.head.attempts.size should be (1)
+ list.last.attempts.size should be (2)
+ list.head.attempts.head.attemptId should be (Some("attempt1"))
+
+ list.foreach { case app =>
+ app.attempts.foreach { attempt =>
+ val appUi = provider.getAppUI(app.id, attempt.attemptId)
+ appUi should not be null
+ }
+ }
+
+ }
+ }
+
+ test("log cleaner") {
+ val maxAge = TimeUnit.SECONDS.toMillis(10)
+ val clock = new ManualClock(maxAge / 2)
+ val provider = new FsHistoryProvider(
+ createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock)
+
+ val log1 = newLogFile("app1", Some("attempt1"), inProgress = false)
+ writeFile(log1, true, None,
+ SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")),
+ SparkListenerApplicationEnd(2L)
+ )
+ log1.setLastModified(0L)
+
+ val log2 = newLogFile("app1", Some("attempt2"), inProgress = false)
+ writeFile(log2, true, None,
+ SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2")),
+ SparkListenerApplicationEnd(4L)
+ )
+ log2.setLastModified(clock.getTimeMillis())
+
+ updateAndCheck(provider) { list =>
+ list.size should be (1)
+ list.head.attempts.size should be (2)
+ }
+
+ // Move the clock forward so log1 exceeds the max age.
+ clock.advance(maxAge)
+
+ updateAndCheck(provider) { list =>
+ list.size should be (1)
+ list.head.attempts.size should be (1)
+ list.head.attempts.head.attemptId should be (Some("attempt2"))
+ }
+ assert(!log1.exists())
+
+ // Do the same for the other log.
+ clock.advance(maxAge)
+
+ updateAndCheck(provider) { list =>
+ list.size should be (0)
+ }
+ assert(!log2.exists())
+ }
+
+ /**
+ * Asks the provider to check for logs and calls a function to perform checks on the updated
+ * app list. Example:
+ *
+ * updateAndCheck(provider) { list =>
+ * // asserts
+ * }
+ */
+ private def updateAndCheck(provider: FsHistoryProvider)
+ (checkFn: Seq[ApplicationHistoryInfo] => Unit): Unit = {
+ provider.checkForLogs()
+ provider.cleanLogs()
+ checkFn(provider.getListing().toSeq)
+ }
+
private def writeFile(file: File, isNewFormat: Boolean, codec: Option[CompressionCodec],
events: SparkListenerEvent*) = {
val fstream = new FileOutputStream(file)
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 20de46fdab..71ba9c1825 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -36,7 +36,8 @@ class HistoryServerSuite extends FunSuite with Matchers with MockitoSugar {
val request = mock[HttpServletRequest]
val ui = mock[SparkUI]
val link = "/history/app1"
- val info = new ApplicationHistoryInfo("app1", "app1", 0, 2, 1, "xxx", true)
+ val info = new ApplicationHistoryInfo("app1", "app1",
+ List(ApplicationAttemptInfo(None, 0, 2, 1, "xxx", true)))
when(historyServer.getApplicationList()).thenReturn(Seq(info))
when(ui.basePath).thenReturn(link)
when(historyServer.getProviderConfig()).thenReturn(Map[String, String]())
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 3c52a8c446..2482603f42 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -95,6 +95,7 @@ class DAGSchedulerSuite
override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
override def defaultParallelism() = 2
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
+ override def applicationAttemptId(): Option[String] = None
}
/** Length of time to wait while draining listener events. */
@@ -404,6 +405,7 @@ class DAGSchedulerSuite
taskMetrics: Array[(Long, TaskMetrics)],
blockManagerId: BlockManagerId): Boolean = true
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
+ override def applicationAttemptId(): Option[String] = None
}
val noKillScheduler = new DAGScheduler(
sc,
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 6d25edb7d2..b52a8d11d1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -61,7 +61,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
test("Verify log file exist") {
// Verify logging directory exists
val conf = getLoggingConf(testDirPath)
- val eventLogger = new EventLoggingListener("test", testDirPath.toUri(), conf)
+ val eventLogger = new EventLoggingListener("test", None, testDirPath.toUri(), conf)
eventLogger.start()
val logPath = new Path(eventLogger.logPath + EventLoggingListener.IN_PROGRESS)
@@ -95,7 +95,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
}
test("Log overwriting") {
- val logUri = EventLoggingListener.getLogPath(testDir.toURI, "test")
+ val logUri = EventLoggingListener.getLogPath(testDir.toURI, "test", None)
val logPath = new URI(logUri).getPath
// Create file before writing the event log
new FileOutputStream(new File(logPath)).close()
@@ -108,18 +108,18 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
test("Event log name") {
// without compression
assert(s"file:/base-dir/app1" === EventLoggingListener.getLogPath(
- Utils.resolveURI("/base-dir"), "app1"))
+ Utils.resolveURI("/base-dir"), "app1", None))
// with compression
assert(s"file:/base-dir/app1.lzf" ===
- EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), "app1", Some("lzf")))
+ EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), "app1", None, Some("lzf")))
// illegal characters in app ID
assert(s"file:/base-dir/a-fine-mind_dollar_bills__1" ===
EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"),
- "a fine:mind$dollar{bills}.1"))
+ "a fine:mind$dollar{bills}.1", None))
// illegal characters in app ID with compression
assert(s"file:/base-dir/a-fine-mind_dollar_bills__1.lz4" ===
EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"),
- "a fine:mind$dollar{bills}.1", Some("lz4")))
+ "a fine:mind$dollar{bills}.1", None, Some("lz4")))
}
/* ----------------- *
@@ -140,10 +140,10 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
val conf = getLoggingConf(testDirPath, compressionCodec)
extraConf.foreach { case (k, v) => conf.set(k, v) }
val logName = compressionCodec.map("test-" + _).getOrElse("test")
- val eventLogger = new EventLoggingListener(logName, testDirPath.toUri(), conf)
+ val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf)
val listenerBus = new LiveListenerBus
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
- 125L, "Mickey")
+ 125L, "Mickey", None)
val applicationEnd = SparkListenerApplicationEnd(1000L)
// A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite
@@ -186,7 +186,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
val eventLogPath = eventLogger.logPath
val expectedLogDir = testDir.toURI()
assert(eventLogPath === EventLoggingListener.getLogPath(
- expectedLogDir, sc.applicationId, compressionCodec.map(CompressionCodec.getShortName)))
+ expectedLogDir, sc.applicationId, None, compressionCodec.map(CompressionCodec.getShortName)))
// Begin listening for events that trigger asserts
val eventExistenceListener = new EventExistenceListener(eventLogger)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index 6de6d2fec6..dabe4574b6 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -50,7 +50,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
val fstream = fileSystem.create(logFilePath)
val writer = new PrintWriter(fstream)
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
- 125L, "Mickey")
+ 125L, "Mickey", None)
val applicationEnd = SparkListenerApplicationEnd(1000L)
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
@@ -146,7 +146,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
* log the events.
*/
private class EventMonster(conf: SparkConf)
- extends EventLoggingListener("test", new URI("testdir"), conf) {
+ extends EventLoggingListener("test", None, new URI("testdir"), conf) {
override def start() { }
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 2d039cb75a..0c9cf5bc68 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -74,7 +74,8 @@ class JsonProtocolSuite extends FunSuite {
val blockManagerRemoved = SparkListenerBlockManagerRemoved(2L,
BlockManagerId("Scarce", "to be counted...", 100))
val unpersistRdd = SparkListenerUnpersistRDD(12345)
- val applicationStart = SparkListenerApplicationStart("The winner of all", None, 42L, "Garfield")
+ val applicationStart = SparkListenerApplicationStart("The winner of all", Some("appId"),
+ 42L, "Garfield", Some("appAttempt"))
val applicationEnd = SparkListenerApplicationEnd(42L)
val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap
val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1",
@@ -274,9 +275,11 @@ class JsonProtocolSuite extends FunSuite {
test("SparkListenerApplicationStart backwards compatibility") {
// SparkListenerApplicationStart in Spark 1.0.0 do not have an "appId" property.
- val applicationStart = SparkListenerApplicationStart("test", None, 1L, "user")
+ // SparkListenerApplicationStart pre-Spark 1.4 does not have "appAttemptId".
+ val applicationStart = SparkListenerApplicationStart("test", None, 1L, "user", None)
val oldEvent = JsonProtocol.applicationStartToJson(applicationStart)
.removeField({ _._1 == "App ID" })
+ .removeField({ _._1 == "App Attempt ID" })
assert(applicationStart === JsonProtocol.applicationStartFromJson(oldEvent))
}
@@ -1497,8 +1500,10 @@ class JsonProtocolSuite extends FunSuite {
|{
| "Event": "SparkListenerApplicationStart",
| "App Name": "The winner of all",
+ | "App ID": "appId",
| "Timestamp": 42,
- | "User": "Garfield"
+ | "User": "Garfield",
+ | "App Attempt ID": "appAttempt"
|}
"""