aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala243
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala18
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala11
6 files changed, 212 insertions, 69 deletions
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 9e367a0d9a..a0a0afa488 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.history
import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStreamWriter}
import java.net.URI
+import java.util.concurrent.TimeUnit
import scala.io.Source
@@ -30,7 +31,7 @@ import org.scalatest.Matchers
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.io._
import org.apache.spark.scheduler._
-import org.apache.spark.util.{JsonProtocol, Utils}
+import org.apache.spark.util.{JsonProtocol, ManualClock, Utils}
class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers with Logging {
@@ -47,10 +48,11 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
/** Create a fake log file using the new log format used in Spark 1.3+ */
private def newLogFile(
appId: String,
+ appAttemptId: Option[String],
inProgress: Boolean,
codec: Option[String] = None): File = {
val ip = if (inProgress) EventLoggingListener.IN_PROGRESS else ""
- val logUri = EventLoggingListener.getLogPath(testDir.toURI, appId)
+ val logUri = EventLoggingListener.getLogPath(testDir.toURI, appId, appAttemptId)
val logPath = new URI(logUri).getPath + ip
new File(logPath)
}
@@ -59,22 +61,23 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
val provider = new FsHistoryProvider(createTestConf())
// Write a new-style application log.
- val newAppComplete = newLogFile("new1", inProgress = false)
+ val newAppComplete = newLogFile("new1", None, inProgress = false)
writeFile(newAppComplete, true, None,
- SparkListenerApplicationStart("new-app-complete", None, 1L, "test"),
+ SparkListenerApplicationStart("new-app-complete", None, 1L, "test", None),
SparkListenerApplicationEnd(5L)
)
// Write a new-style application log.
- val newAppCompressedComplete = newLogFile("new1compressed", inProgress = false, Some("lzf"))
+ val newAppCompressedComplete = newLogFile("new1compressed", None, inProgress = false,
+ Some("lzf"))
writeFile(newAppCompressedComplete, true, None,
- SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test"),
+ SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test", None),
SparkListenerApplicationEnd(4L))
// Write an unfinished app, new-style.
- val newAppIncomplete = newLogFile("new2", inProgress = true)
+ val newAppIncomplete = newLogFile("new2", None, inProgress = true)
writeFile(newAppIncomplete, true, None,
- SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test")
+ SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test", None)
)
// Write an old-style application log.
@@ -82,7 +85,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
oldAppComplete.mkdir()
createEmptyFile(new File(oldAppComplete, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(oldAppComplete, provider.LOG_PREFIX + "1"), false, None,
- SparkListenerApplicationStart("old-app-complete", None, 2L, "test"),
+ SparkListenerApplicationStart("old-app-complete", None, 2L, "test", None),
SparkListenerApplicationEnd(3L)
)
createEmptyFile(new File(oldAppComplete, provider.APPLICATION_COMPLETE))
@@ -96,33 +99,45 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
oldAppIncomplete.mkdir()
createEmptyFile(new File(oldAppIncomplete, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(oldAppIncomplete, provider.LOG_PREFIX + "1"), false, None,
- SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test")
+ SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test", None)
)
// Force a reload of data from the log directory, and check that both logs are loaded.
// Take the opportunity to check that the offset checks work as expected.
- provider.checkForLogs()
+ updateAndCheck(provider) { list =>
+ list.size should be (5)
+ list.count(_.attempts.head.completed) should be (3)
+
+ def makeAppInfo(
+ id: String,
+ name: String,
+ start: Long,
+ end: Long,
+ lastMod: Long,
+ user: String,
+ completed: Boolean): ApplicationHistoryInfo = {
+ ApplicationHistoryInfo(id, name,
+ List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed)))
+ }
- val list = provider.getListing().toSeq
- list should not be (null)
- list.size should be (5)
- list.count(_.completed) should be (3)
-
- list(0) should be (ApplicationHistoryInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L,
- newAppComplete.lastModified(), "test", true))
- list(1) should be (ApplicationHistoryInfo(newAppCompressedComplete.getName(),
- "new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test", true))
- list(2) should be (ApplicationHistoryInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
- oldAppComplete.lastModified(), "test", true))
- list(3) should be (ApplicationHistoryInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L,
- -1L, oldAppIncomplete.lastModified(), "test", false))
- list(4) should be (ApplicationHistoryInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L,
- -1L, newAppIncomplete.lastModified(), "test", false))
-
- // Make sure the UI can be rendered.
- list.foreach { case info =>
- val appUi = provider.getAppUI(info.id)
- appUi should not be null
+ list(0) should be (makeAppInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L,
+ newAppComplete.lastModified(), "test", true))
+ list(1) should be (makeAppInfo(newAppCompressedComplete.getName(),
+ "new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test",
+ true))
+ list(2) should be (makeAppInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
+ oldAppComplete.lastModified(), "test", true))
+ list(3) should be (makeAppInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L, -1L,
+ oldAppIncomplete.lastModified(), "test", false))
+ list(4) should be (makeAppInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L, -1L,
+ newAppIncomplete.lastModified(), "test", false))
+
+ // Make sure the UI can be rendered.
+ list.foreach { case info =>
+ val appUi = provider.getAppUI(info.id, None)
+ appUi should not be null
+ appUi should not be None
+ }
}
}
@@ -138,7 +153,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
logDir.mkdir()
createEmptyFile(new File(logDir, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(logDir, provider.LOG_PREFIX + "1"), false, Option(codec),
- SparkListenerApplicationStart("app2", None, 2L, "test"),
+ SparkListenerApplicationStart("app2", None, 2L, "test", None),
SparkListenerApplicationEnd(3L)
)
createEmptyFile(new File(logDir, provider.COMPRESSION_CODEC_PREFIX + codecName))
@@ -159,52 +174,52 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
}
test("SPARK-3697: ignore directories that cannot be read.") {
- val logFile1 = newLogFile("new1", inProgress = false)
+ val logFile1 = newLogFile("new1", None, inProgress = false)
writeFile(logFile1, true, None,
- SparkListenerApplicationStart("app1-1", None, 1L, "test"),
+ SparkListenerApplicationStart("app1-1", None, 1L, "test", None),
SparkListenerApplicationEnd(2L)
)
- val logFile2 = newLogFile("new2", inProgress = false)
+ val logFile2 = newLogFile("new2", None, inProgress = false)
writeFile(logFile2, true, None,
- SparkListenerApplicationStart("app1-2", None, 1L, "test"),
+ SparkListenerApplicationStart("app1-2", None, 1L, "test", None),
SparkListenerApplicationEnd(2L)
)
logFile2.setReadable(false, false)
val provider = new FsHistoryProvider(createTestConf())
- provider.checkForLogs()
-
- val list = provider.getListing().toSeq
- list should not be (null)
- list.size should be (1)
+ updateAndCheck(provider) { list =>
+ list.size should be (1)
+ }
}
test("history file is renamed from inprogress to completed") {
val provider = new FsHistoryProvider(createTestConf())
- val logFile1 = newLogFile("app1", inProgress = true)
+ val logFile1 = newLogFile("app1", None, inProgress = true)
writeFile(logFile1, true, None,
- SparkListenerApplicationStart("app1", Some("app1"), 1L, "test"),
+ SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", None),
SparkListenerApplicationEnd(2L)
)
- provider.checkForLogs()
- val appListBeforeRename = provider.getListing()
- appListBeforeRename.size should be (1)
- appListBeforeRename.head.logPath should endWith(EventLoggingListener.IN_PROGRESS)
+ updateAndCheck(provider) { list =>
+ list.size should be (1)
+ list.head.attempts.head.asInstanceOf[FsApplicationAttemptInfo].logPath should
+ endWith(EventLoggingListener.IN_PROGRESS)
+ }
- logFile1.renameTo(newLogFile("app1", inProgress = false))
- provider.checkForLogs()
- val appListAfterRename = provider.getListing()
- appListAfterRename.size should be (1)
- appListAfterRename.head.logPath should not endWith(EventLoggingListener.IN_PROGRESS)
+ logFile1.renameTo(newLogFile("app1", None, inProgress = false))
+ updateAndCheck(provider) { list =>
+ list.size should be (1)
+ list.head.attempts.head.asInstanceOf[FsApplicationAttemptInfo].logPath should not
+ endWith(EventLoggingListener.IN_PROGRESS)
+ }
}
test("SPARK-5582: empty log directory") {
val provider = new FsHistoryProvider(createTestConf())
- val logFile1 = newLogFile("app1", inProgress = true)
+ val logFile1 = newLogFile("app1", None, inProgress = true)
writeFile(logFile1, true, None,
- SparkListenerApplicationStart("app1", Some("app1"), 1L, "test"),
+ SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", None),
SparkListenerApplicationEnd(2L))
val oldLog = new File(testDir, "old1")
@@ -215,6 +230,126 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
appListAfterRename.size should be (1)
}
+ test("apps with multiple attempts") {
+ val provider = new FsHistoryProvider(createTestConf())
+
+ val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = false)
+ writeFile(attempt1, true, None,
+ SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")),
+ SparkListenerApplicationEnd(2L)
+ )
+
+ updateAndCheck(provider) { list =>
+ list.size should be (1)
+ list.head.attempts.size should be (1)
+ }
+
+ val attempt2 = newLogFile("app1", Some("attempt2"), inProgress = true)
+ writeFile(attempt2, true, None,
+ SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2"))
+ )
+
+ updateAndCheck(provider) { list =>
+ list.size should be (1)
+ list.head.attempts.size should be (2)
+ list.head.attempts.head.attemptId should be (Some("attempt2"))
+ }
+
+ val completedAttempt2 = newLogFile("app1", Some("attempt2"), inProgress = false)
+ attempt2.delete()
+ writeFile(attempt2, true, None,
+ SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2")),
+ SparkListenerApplicationEnd(4L)
+ )
+
+ updateAndCheck(provider) { list =>
+ list should not be (null)
+ list.size should be (1)
+ list.head.attempts.size should be (2)
+ list.head.attempts.head.attemptId should be (Some("attempt2"))
+ }
+
+ val app2Attempt1 = newLogFile("app2", Some("attempt1"), inProgress = false)
+ writeFile(attempt2, true, None,
+ SparkListenerApplicationStart("app2", Some("app2"), 5L, "test", Some("attempt1")),
+ SparkListenerApplicationEnd(6L)
+ )
+
+ updateAndCheck(provider) { list =>
+ list.size should be (2)
+ list.head.attempts.size should be (1)
+ list.last.attempts.size should be (2)
+ list.head.attempts.head.attemptId should be (Some("attempt1"))
+
+ list.foreach { case app =>
+ app.attempts.foreach { attempt =>
+ val appUi = provider.getAppUI(app.id, attempt.attemptId)
+ appUi should not be null
+ }
+ }
+
+ }
+ }
+
+ test("log cleaner") {
+ val maxAge = TimeUnit.SECONDS.toMillis(10)
+ val clock = new ManualClock(maxAge / 2)
+ val provider = new FsHistoryProvider(
+ createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock)
+
+ val log1 = newLogFile("app1", Some("attempt1"), inProgress = false)
+ writeFile(log1, true, None,
+ SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")),
+ SparkListenerApplicationEnd(2L)
+ )
+ log1.setLastModified(0L)
+
+ val log2 = newLogFile("app1", Some("attempt2"), inProgress = false)
+ writeFile(log2, true, None,
+ SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2")),
+ SparkListenerApplicationEnd(4L)
+ )
+ log2.setLastModified(clock.getTimeMillis())
+
+ updateAndCheck(provider) { list =>
+ list.size should be (1)
+ list.head.attempts.size should be (2)
+ }
+
+ // Move the clock forward so log1 exceeds the max age.
+ clock.advance(maxAge)
+
+ updateAndCheck(provider) { list =>
+ list.size should be (1)
+ list.head.attempts.size should be (1)
+ list.head.attempts.head.attemptId should be (Some("attempt2"))
+ }
+ assert(!log1.exists())
+
+ // Do the same for the other log.
+ clock.advance(maxAge)
+
+ updateAndCheck(provider) { list =>
+ list.size should be (0)
+ }
+ assert(!log2.exists())
+ }
+
+ /**
+ * Asks the provider to check for logs and calls a function to perform checks on the updated
+ * app list. Example:
+ *
+ * updateAndCheck(provider) { list =>
+ * // asserts
+ * }
+ */
+ private def updateAndCheck(provider: FsHistoryProvider)
+ (checkFn: Seq[ApplicationHistoryInfo] => Unit): Unit = {
+ provider.checkForLogs()
+ provider.cleanLogs()
+ checkFn(provider.getListing().toSeq)
+ }
+
private def writeFile(file: File, isNewFormat: Boolean, codec: Option[CompressionCodec],
events: SparkListenerEvent*) = {
val fstream = new FileOutputStream(file)
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 20de46fdab..71ba9c1825 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -36,7 +36,8 @@ class HistoryServerSuite extends FunSuite with Matchers with MockitoSugar {
val request = mock[HttpServletRequest]
val ui = mock[SparkUI]
val link = "/history/app1"
- val info = new ApplicationHistoryInfo("app1", "app1", 0, 2, 1, "xxx", true)
+ val info = new ApplicationHistoryInfo("app1", "app1",
+ List(ApplicationAttemptInfo(None, 0, 2, 1, "xxx", true)))
when(historyServer.getApplicationList()).thenReturn(Seq(info))
when(ui.basePath).thenReturn(link)
when(historyServer.getProviderConfig()).thenReturn(Map[String, String]())
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 3c52a8c446..2482603f42 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -95,6 +95,7 @@ class DAGSchedulerSuite
override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
override def defaultParallelism() = 2
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
+ override def applicationAttemptId(): Option[String] = None
}
/** Length of time to wait while draining listener events. */
@@ -404,6 +405,7 @@ class DAGSchedulerSuite
taskMetrics: Array[(Long, TaskMetrics)],
blockManagerId: BlockManagerId): Boolean = true
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
+ override def applicationAttemptId(): Option[String] = None
}
val noKillScheduler = new DAGScheduler(
sc,
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 6d25edb7d2..b52a8d11d1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -61,7 +61,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
test("Verify log file exist") {
// Verify logging directory exists
val conf = getLoggingConf(testDirPath)
- val eventLogger = new EventLoggingListener("test", testDirPath.toUri(), conf)
+ val eventLogger = new EventLoggingListener("test", None, testDirPath.toUri(), conf)
eventLogger.start()
val logPath = new Path(eventLogger.logPath + EventLoggingListener.IN_PROGRESS)
@@ -95,7 +95,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
}
test("Log overwriting") {
- val logUri = EventLoggingListener.getLogPath(testDir.toURI, "test")
+ val logUri = EventLoggingListener.getLogPath(testDir.toURI, "test", None)
val logPath = new URI(logUri).getPath
// Create file before writing the event log
new FileOutputStream(new File(logPath)).close()
@@ -108,18 +108,18 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
test("Event log name") {
// without compression
assert(s"file:/base-dir/app1" === EventLoggingListener.getLogPath(
- Utils.resolveURI("/base-dir"), "app1"))
+ Utils.resolveURI("/base-dir"), "app1", None))
// with compression
assert(s"file:/base-dir/app1.lzf" ===
- EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), "app1", Some("lzf")))
+ EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), "app1", None, Some("lzf")))
// illegal characters in app ID
assert(s"file:/base-dir/a-fine-mind_dollar_bills__1" ===
EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"),
- "a fine:mind$dollar{bills}.1"))
+ "a fine:mind$dollar{bills}.1", None))
// illegal characters in app ID with compression
assert(s"file:/base-dir/a-fine-mind_dollar_bills__1.lz4" ===
EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"),
- "a fine:mind$dollar{bills}.1", Some("lz4")))
+ "a fine:mind$dollar{bills}.1", None, Some("lz4")))
}
/* ----------------- *
@@ -140,10 +140,10 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
val conf = getLoggingConf(testDirPath, compressionCodec)
extraConf.foreach { case (k, v) => conf.set(k, v) }
val logName = compressionCodec.map("test-" + _).getOrElse("test")
- val eventLogger = new EventLoggingListener(logName, testDirPath.toUri(), conf)
+ val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf)
val listenerBus = new LiveListenerBus
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
- 125L, "Mickey")
+ 125L, "Mickey", None)
val applicationEnd = SparkListenerApplicationEnd(1000L)
// A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite
@@ -186,7 +186,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
val eventLogPath = eventLogger.logPath
val expectedLogDir = testDir.toURI()
assert(eventLogPath === EventLoggingListener.getLogPath(
- expectedLogDir, sc.applicationId, compressionCodec.map(CompressionCodec.getShortName)))
+ expectedLogDir, sc.applicationId, None, compressionCodec.map(CompressionCodec.getShortName)))
// Begin listening for events that trigger asserts
val eventExistenceListener = new EventExistenceListener(eventLogger)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index 6de6d2fec6..dabe4574b6 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -50,7 +50,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
val fstream = fileSystem.create(logFilePath)
val writer = new PrintWriter(fstream)
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
- 125L, "Mickey")
+ 125L, "Mickey", None)
val applicationEnd = SparkListenerApplicationEnd(1000L)
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
@@ -146,7 +146,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
* log the events.
*/
private class EventMonster(conf: SparkConf)
- extends EventLoggingListener("test", new URI("testdir"), conf) {
+ extends EventLoggingListener("test", None, new URI("testdir"), conf) {
override def start() { }
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 2d039cb75a..0c9cf5bc68 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -74,7 +74,8 @@ class JsonProtocolSuite extends FunSuite {
val blockManagerRemoved = SparkListenerBlockManagerRemoved(2L,
BlockManagerId("Scarce", "to be counted...", 100))
val unpersistRdd = SparkListenerUnpersistRDD(12345)
- val applicationStart = SparkListenerApplicationStart("The winner of all", None, 42L, "Garfield")
+ val applicationStart = SparkListenerApplicationStart("The winner of all", Some("appId"),
+ 42L, "Garfield", Some("appAttempt"))
val applicationEnd = SparkListenerApplicationEnd(42L)
val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap
val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1",
@@ -274,9 +275,11 @@ class JsonProtocolSuite extends FunSuite {
test("SparkListenerApplicationStart backwards compatibility") {
// SparkListenerApplicationStart in Spark 1.0.0 do not have an "appId" property.
- val applicationStart = SparkListenerApplicationStart("test", None, 1L, "user")
+ // SparkListenerApplicationStart pre-Spark 1.4 does not have "appAttemptId".
+ val applicationStart = SparkListenerApplicationStart("test", None, 1L, "user", None)
val oldEvent = JsonProtocol.applicationStartToJson(applicationStart)
.removeField({ _._1 == "App ID" })
+ .removeField({ _._1 == "App Attempt ID" })
assert(applicationStart === JsonProtocol.applicationStartFromJson(oldEvent))
}
@@ -1497,8 +1500,10 @@ class JsonProtocolSuite extends FunSuite {
|{
| "Event": "SparkListenerApplicationStart",
| "App Name": "The winner of all",
+ | "App ID": "appId",
| "Timestamp": 42,
- | "User": "Garfield"
+ | "User": "Garfield",
+ | "App Attempt ID": "appAttempt"
|}
"""