aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorfelixcheung <felixcheung_m@hotmail.com>2016-02-01 16:55:21 -0800
committerAndrew Or <andrew@databricks.com>2016-02-01 16:55:21 -0800
commit0df3cfb8ab4d584c95db6c340694e199d7b59e9e (patch)
treeb1cef848003d0c4575dd2967721ba4f2a7a693cb
parent715a19d56fc934d4aec5025739ff650daf4580b7 (diff)
downloadspark-0df3cfb8ab4d584c95db6c340694e199d7b59e9e.tar.gz
spark-0df3cfb8ab4d584c95db6c340694e199d7b59e9e.tar.bz2
spark-0df3cfb8ab4d584c95db6c340694e199d7b59e9e.zip
[SPARK-12790][CORE] Remove HistoryServer old multiple files format
Removed isLegacyLogDirectory code path and updated tests andrewor14 Author: felixcheung <felixcheung_m@hotmail.com> Closes #10860 from felixcheung/historyserverformat.
-rw-r--r--.rat-excludes12
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala124
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala2
-rwxr-xr-xcore/src/test/resources/spark-events/local-1422981759269 (renamed from core/src/test/resources/spark-events/local-1422981759269/EVENT_LOG_1)0
-rwxr-xr-xcore/src/test/resources/spark-events/local-1422981759269/APPLICATION_COMPLETE0
-rwxr-xr-xcore/src/test/resources/spark-events/local-1422981759269/SPARK_VERSION_1.2.00
-rwxr-xr-xcore/src/test/resources/spark-events/local-1422981780767 (renamed from core/src/test/resources/spark-events/local-1422981780767/EVENT_LOG_1)0
-rwxr-xr-xcore/src/test/resources/spark-events/local-1422981780767/APPLICATION_COMPLETE0
-rwxr-xr-xcore/src/test/resources/spark-events/local-1422981780767/SPARK_VERSION_1.2.00
-rwxr-xr-xcore/src/test/resources/spark-events/local-1425081759269 (renamed from core/src/test/resources/spark-events/local-1425081759269/EVENT_LOG_1)0
-rwxr-xr-xcore/src/test/resources/spark-events/local-1425081759269/APPLICATION_COMPLETE0
-rwxr-xr-xcore/src/test/resources/spark-events/local-1425081759269/SPARK_VERSION_1.2.00
-rwxr-xr-xcore/src/test/resources/spark-events/local-1426533911241 (renamed from core/src/test/resources/spark-events/local-1426533911241/EVENT_LOG_1)0
-rwxr-xr-xcore/src/test/resources/spark-events/local-1426533911241/APPLICATION_COMPLETE0
-rwxr-xr-xcore/src/test/resources/spark-events/local-1426533911241/SPARK_VERSION_1.2.00
-rwxr-xr-xcore/src/test/resources/spark-events/local-1426633911242 (renamed from core/src/test/resources/spark-events/local-1426633911242/EVENT_LOG_1)0
-rwxr-xr-xcore/src/test/resources/spark-events/local-1426633911242/APPLICATION_COMPLETE0
-rwxr-xr-xcore/src/test/resources/spark-events/local-1426633911242/SPARK_VERSION_1.2.00
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala95
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala25
20 files changed, 23 insertions, 235 deletions
diff --git a/.rat-excludes b/.rat-excludes
index 874a6ee9f4..8b5061415f 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -73,12 +73,12 @@ logs
.*dependency-reduced-pom.xml
known_translations
json_expectation
-local-1422981759269/*
-local-1422981780767/*
-local-1425081759269/*
-local-1426533911241/*
-local-1426633911242/*
-local-1430917381534/*
+local-1422981759269
+local-1422981780767
+local-1425081759269
+local-1426533911241
+local-1426633911242
+local-1430917381534
local-1430917381535_1
local-1430917381535_2
DESCRIPTION
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 22e4155cc5..9648959dba 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -248,9 +248,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val logInfos: Seq[FileStatus] = statusList
.filter { entry =>
try {
- getModificationTime(entry).map { time =>
- time >= lastScanTime
- }.getOrElse(false)
+ !entry.isDirectory() && (entry.getModificationTime() >= lastScanTime)
} catch {
case e: AccessControlException =>
// Do not use "logInfo" since these messages can get pretty noisy if printed on
@@ -261,9 +259,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
.flatMap { entry => Some(entry) }
.sortWith { case (entry1, entry2) =>
- val mod1 = getModificationTime(entry1).getOrElse(-1L)
- val mod2 = getModificationTime(entry2).getOrElse(-1L)
- mod1 >= mod2
+ entry1.getModificationTime() >= entry2.getModificationTime()
}
logInfos.grouped(20)
@@ -341,19 +337,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
attempt.attemptId.isEmpty || attemptId.isEmpty || attempt.attemptId.get == attemptId.get
}.foreach { attempt =>
val logPath = new Path(logDir, attempt.logPath)
- // If this is a legacy directory, then add the directory to the zipStream and add
- // each file to that directory.
- if (isLegacyLogDirectory(fs.getFileStatus(logPath))) {
- val files = fs.listStatus(logPath)
- zipStream.putNextEntry(new ZipEntry(attempt.logPath + "/"))
- zipStream.closeEntry()
- files.foreach { file =>
- val path = file.getPath
- zipFileToStream(path, attempt.logPath + Path.SEPARATOR + path.getName, zipStream)
- }
- } else {
- zipFileToStream(new Path(logDir, attempt.logPath), attempt.logPath, zipStream)
- }
+ zipFileToStream(new Path(logDir, attempt.logPath), attempt.logPath, zipStream)
}
} finally {
zipStream.close()
@@ -527,12 +511,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
val logPath = eventLog.getPath()
logInfo(s"Replaying log path: $logPath")
- val logInput =
- if (isLegacyLogDirectory(eventLog)) {
- openLegacyEventLog(logPath)
- } else {
- EventLoggingListener.openEventLog(logPath, fs)
- }
+ val logInput = EventLoggingListener.openEventLog(logPath, fs)
try {
val appListener = new ApplicationEventListener
val appCompleted = isApplicationCompleted(eventLog)
@@ -540,9 +519,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
bus.replay(logInput, logPath.toString, !appCompleted)
// Without an app ID, new logs will render incorrectly in the listing page, so do not list or
- // try to show their UI. Some old versions of Spark generate logs without an app ID, so let
- // logs generated by those versions go through.
- if (appListener.appId.isDefined || !sparkVersionHasAppId(eventLog)) {
+ // try to show their UI.
+ if (appListener.appId.isDefined) {
Some(new FsApplicationAttemptInfo(
logPath.getName(),
appListener.appName.getOrElse(NOT_STARTED),
@@ -550,7 +528,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
appListener.appAttemptId,
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
- getModificationTime(eventLog).get,
+ eventLog.getModificationTime(),
appListener.sparkUser.getOrElse(NOT_STARTED),
appCompleted))
} else {
@@ -562,90 +540,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
/**
- * Loads a legacy log directory. This assumes that the log directory contains a single event
- * log file (along with other metadata files), which is the case for directories generated by
- * the code in previous releases.
- *
- * @return input stream that holds one JSON record per line.
- */
- private[history] def openLegacyEventLog(dir: Path): InputStream = {
- val children = fs.listStatus(dir)
- var eventLogPath: Path = null
- var codecName: Option[String] = None
-
- children.foreach { child =>
- child.getPath().getName() match {
- case name if name.startsWith(LOG_PREFIX) =>
- eventLogPath = child.getPath()
- case codec if codec.startsWith(COMPRESSION_CODEC_PREFIX) =>
- codecName = Some(codec.substring(COMPRESSION_CODEC_PREFIX.length()))
- case _ =>
- }
- }
-
- if (eventLogPath == null) {
- throw new IllegalArgumentException(s"$dir is not a Spark application log directory.")
- }
-
- val codec = try {
- codecName.map { c => CompressionCodec.createCodec(conf, c) }
- } catch {
- case e: Exception =>
- throw new IllegalArgumentException(s"Unknown compression codec $codecName.")
- }
-
- val in = new BufferedInputStream(fs.open(eventLogPath))
- codec.map(_.compressedInputStream(in)).getOrElse(in)
- }
-
- /**
- * Return whether the specified event log path contains a old directory-based event log.
- * Previously, the event log of an application comprises of multiple files in a directory.
- * As of Spark 1.3, these files are consolidated into a single one that replaces the directory.
- * See SPARK-2261 for more detail.
- */
- private def isLegacyLogDirectory(entry: FileStatus): Boolean = entry.isDirectory
-
- /**
- * Returns the modification time of the given event log. If the status points at an empty
- * directory, `None` is returned, indicating that there isn't an event log at that location.
- */
- private def getModificationTime(fsEntry: FileStatus): Option[Long] = {
- if (isLegacyLogDirectory(fsEntry)) {
- val statusList = fs.listStatus(fsEntry.getPath)
- if (!statusList.isEmpty) Some(statusList.map(_.getModificationTime()).max) else None
- } else {
- Some(fsEntry.getModificationTime())
- }
- }
-
- /**
* Return true when the application has completed.
*/
private def isApplicationCompleted(entry: FileStatus): Boolean = {
- if (isLegacyLogDirectory(entry)) {
- fs.exists(new Path(entry.getPath(), APPLICATION_COMPLETE))
- } else {
- !entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS)
- }
- }
-
- /**
- * Returns whether the version of Spark that generated logs records app IDs. App IDs were added
- * in Spark 1.1.
- */
- private def sparkVersionHasAppId(entry: FileStatus): Boolean = {
- if (isLegacyLogDirectory(entry)) {
- fs.listStatus(entry.getPath())
- .find { status => status.getPath().getName().startsWith(SPARK_VERSION_PREFIX) }
- .map { status =>
- val version = status.getPath().getName().substring(SPARK_VERSION_PREFIX.length())
- version != "1.0" && version != "1.1"
- }
- .getOrElse(true)
- } else {
- true
- }
+ !entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS)
}
/**
@@ -670,12 +568,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
private[history] object FsHistoryProvider {
val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
-
- // Constants used to parse Spark 1.0.0 log directories.
- val LOG_PREFIX = "EVENT_LOG_"
- val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
- val COMPRESSION_CODEC_PREFIX = EventLoggingListener.COMPRESSION_CODEC_KEY + "_"
- val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
}
private class FsApplicationAttemptInfo(
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 36f2b74f94..01fee46e73 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -232,8 +232,6 @@ private[spark] object EventLoggingListener extends Logging {
// Suffix applied to the names of files still being written by applications.
val IN_PROGRESS = ".inprogress"
val DEFAULT_LOG_DIR = "/tmp/spark-events"
- val SPARK_VERSION_KEY = "SPARK_VERSION"
- val COMPRESSION_CODEC_KEY = "COMPRESSION_CODEC"
private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort)
diff --git a/core/src/test/resources/spark-events/local-1422981759269/EVENT_LOG_1 b/core/src/test/resources/spark-events/local-1422981759269
index 4794e56d11..4794e56d11 100755
--- a/core/src/test/resources/spark-events/local-1422981759269/EVENT_LOG_1
+++ b/core/src/test/resources/spark-events/local-1422981759269
diff --git a/core/src/test/resources/spark-events/local-1422981759269/APPLICATION_COMPLETE b/core/src/test/resources/spark-events/local-1422981759269/APPLICATION_COMPLETE
deleted file mode 100755
index e69de29bb2..0000000000
--- a/core/src/test/resources/spark-events/local-1422981759269/APPLICATION_COMPLETE
+++ /dev/null
diff --git a/core/src/test/resources/spark-events/local-1422981759269/SPARK_VERSION_1.2.0 b/core/src/test/resources/spark-events/local-1422981759269/SPARK_VERSION_1.2.0
deleted file mode 100755
index e69de29bb2..0000000000
--- a/core/src/test/resources/spark-events/local-1422981759269/SPARK_VERSION_1.2.0
+++ /dev/null
diff --git a/core/src/test/resources/spark-events/local-1422981780767/EVENT_LOG_1 b/core/src/test/resources/spark-events/local-1422981780767
index f14a000bf2..f14a000bf2 100755
--- a/core/src/test/resources/spark-events/local-1422981780767/EVENT_LOG_1
+++ b/core/src/test/resources/spark-events/local-1422981780767
diff --git a/core/src/test/resources/spark-events/local-1422981780767/APPLICATION_COMPLETE b/core/src/test/resources/spark-events/local-1422981780767/APPLICATION_COMPLETE
deleted file mode 100755
index e69de29bb2..0000000000
--- a/core/src/test/resources/spark-events/local-1422981780767/APPLICATION_COMPLETE
+++ /dev/null
diff --git a/core/src/test/resources/spark-events/local-1422981780767/SPARK_VERSION_1.2.0 b/core/src/test/resources/spark-events/local-1422981780767/SPARK_VERSION_1.2.0
deleted file mode 100755
index e69de29bb2..0000000000
--- a/core/src/test/resources/spark-events/local-1422981780767/SPARK_VERSION_1.2.0
+++ /dev/null
diff --git a/core/src/test/resources/spark-events/local-1425081759269/EVENT_LOG_1 b/core/src/test/resources/spark-events/local-1425081759269
index 9745b36b09..9745b36b09 100755
--- a/core/src/test/resources/spark-events/local-1425081759269/EVENT_LOG_1
+++ b/core/src/test/resources/spark-events/local-1425081759269
diff --git a/core/src/test/resources/spark-events/local-1425081759269/APPLICATION_COMPLETE b/core/src/test/resources/spark-events/local-1425081759269/APPLICATION_COMPLETE
deleted file mode 100755
index e69de29bb2..0000000000
--- a/core/src/test/resources/spark-events/local-1425081759269/APPLICATION_COMPLETE
+++ /dev/null
diff --git a/core/src/test/resources/spark-events/local-1425081759269/SPARK_VERSION_1.2.0 b/core/src/test/resources/spark-events/local-1425081759269/SPARK_VERSION_1.2.0
deleted file mode 100755
index e69de29bb2..0000000000
--- a/core/src/test/resources/spark-events/local-1425081759269/SPARK_VERSION_1.2.0
+++ /dev/null
diff --git a/core/src/test/resources/spark-events/local-1426533911241/EVENT_LOG_1 b/core/src/test/resources/spark-events/local-1426533911241
index 9ef5bd5d92..9ef5bd5d92 100755
--- a/core/src/test/resources/spark-events/local-1426533911241/EVENT_LOG_1
+++ b/core/src/test/resources/spark-events/local-1426533911241
diff --git a/core/src/test/resources/spark-events/local-1426533911241/APPLICATION_COMPLETE b/core/src/test/resources/spark-events/local-1426533911241/APPLICATION_COMPLETE
deleted file mode 100755
index e69de29bb2..0000000000
--- a/core/src/test/resources/spark-events/local-1426533911241/APPLICATION_COMPLETE
+++ /dev/null
diff --git a/core/src/test/resources/spark-events/local-1426533911241/SPARK_VERSION_1.2.0 b/core/src/test/resources/spark-events/local-1426533911241/SPARK_VERSION_1.2.0
deleted file mode 100755
index e69de29bb2..0000000000
--- a/core/src/test/resources/spark-events/local-1426533911241/SPARK_VERSION_1.2.0
+++ /dev/null
diff --git a/core/src/test/resources/spark-events/local-1426633911242/EVENT_LOG_1 b/core/src/test/resources/spark-events/local-1426633911242
index e704328210..e704328210 100755
--- a/core/src/test/resources/spark-events/local-1426633911242/EVENT_LOG_1
+++ b/core/src/test/resources/spark-events/local-1426633911242
diff --git a/core/src/test/resources/spark-events/local-1426633911242/APPLICATION_COMPLETE b/core/src/test/resources/spark-events/local-1426633911242/APPLICATION_COMPLETE
deleted file mode 100755
index e69de29bb2..0000000000
--- a/core/src/test/resources/spark-events/local-1426633911242/APPLICATION_COMPLETE
+++ /dev/null
diff --git a/core/src/test/resources/spark-events/local-1426633911242/SPARK_VERSION_1.2.0 b/core/src/test/resources/spark-events/local-1426633911242/SPARK_VERSION_1.2.0
deleted file mode 100755
index e69de29bb2..0000000000
--- a/core/src/test/resources/spark-events/local-1426633911242/SPARK_VERSION_1.2.0
+++ /dev/null
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 6cbf911395..3baa2e2dda 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -69,7 +69,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
new File(logPath)
}
- test("Parse new and old application logs") {
+ test("Parse application logs") {
val provider = new FsHistoryProvider(createTestConf())
// Write a new-style application log.
@@ -95,26 +95,11 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
None)
)
- // Write an old-style application log.
- val oldAppComplete = writeOldLog("old1", "1.0", None, true,
- SparkListenerApplicationStart("old1", Some("old-app-complete"), 2L, "test", None),
- SparkListenerApplicationEnd(3L)
- )
-
- // Check for logs so that we force the older unfinished app to be loaded, to make
- // sure unfinished apps are also sorted correctly.
- provider.checkForLogs()
-
- // Write an unfinished app, old-style.
- val oldAppIncomplete = writeOldLog("old2", "1.0", None, false,
- SparkListenerApplicationStart("old2", None, 2L, "test", None)
- )
-
- // Force a reload of data from the log directory, and check that both logs are loaded.
+ // Force a reload of data from the log directory, and check that logs are loaded.
// Take the opportunity to check that the offset checks work as expected.
updateAndCheck(provider) { list =>
- list.size should be (5)
- list.count(_.attempts.head.completed) should be (3)
+ list.size should be (3)
+ list.count(_.attempts.head.completed) should be (2)
def makeAppInfo(
id: String,
@@ -132,11 +117,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
newAppComplete.lastModified(), "test", true))
list(1) should be (makeAppInfo("new-complete-lzf", newAppCompressedComplete.getName(),
1L, 4L, newAppCompressedComplete.lastModified(), "test", true))
- list(2) should be (makeAppInfo("old-app-complete", oldAppComplete.getName(), 2L, 3L,
- oldAppComplete.lastModified(), "test", true))
- list(3) should be (makeAppInfo(oldAppIncomplete.getName(), oldAppIncomplete.getName(), 2L,
- -1L, oldAppIncomplete.lastModified(), "test", false))
- list(4) should be (makeAppInfo("new-incomplete", newAppIncomplete.getName(), 1L, -1L,
+ list(2) should be (makeAppInfo("new-incomplete", newAppIncomplete.getName(), 1L, -1L,
newAppIncomplete.lastModified(), "test", false))
// Make sure the UI can be rendered.
@@ -148,38 +129,6 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
}
}
- test("Parse legacy logs with compression codec set") {
- val provider = new FsHistoryProvider(createTestConf())
- val testCodecs = List((classOf[LZFCompressionCodec].getName(), true),
- (classOf[SnappyCompressionCodec].getName(), true),
- ("invalid.codec", false))
-
- testCodecs.foreach { case (codecName, valid) =>
- val codec = if (valid) CompressionCodec.createCodec(new SparkConf(), codecName) else null
- val logDir = new File(testDir, codecName)
- logDir.mkdir()
- createEmptyFile(new File(logDir, SPARK_VERSION_PREFIX + "1.0"))
- writeFile(new File(logDir, LOG_PREFIX + "1"), false, Option(codec),
- SparkListenerApplicationStart("app2", None, 2L, "test", None),
- SparkListenerApplicationEnd(3L)
- )
- createEmptyFile(new File(logDir, COMPRESSION_CODEC_PREFIX + codecName))
-
- val logPath = new Path(logDir.getAbsolutePath())
- try {
- val logInput = provider.openLegacyEventLog(logPath)
- try {
- Source.fromInputStream(logInput).getLines().toSeq.size should be (2)
- } finally {
- logInput.close()
- }
- } catch {
- case e: IllegalArgumentException =>
- valid should be (false)
- }
- }
- }
-
test("SPARK-3697: ignore directories that cannot be read.") {
val logFile1 = newLogFile("new1", None, inProgress = false)
writeFile(logFile1, true, None,
@@ -395,21 +344,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
SparkListenerLogStart("1.4")
)
- // Write a 1.2 log file with no start event (= no app id), it should be ignored.
- writeOldLog("v12Log", "1.2", None, false)
-
- // Write 1.0 and 1.1 logs, which don't have app ids.
- writeOldLog("v11Log", "1.1", None, true,
- SparkListenerApplicationStart("v11Log", None, 2L, "test", None),
- SparkListenerApplicationEnd(3L))
- writeOldLog("v10Log", "1.0", None, true,
- SparkListenerApplicationStart("v10Log", None, 2L, "test", None),
- SparkListenerApplicationEnd(4L))
-
updateAndCheck(provider) { list =>
- list.size should be (2)
- list(0).id should be ("v10Log")
- list(1).id should be ("v11Log")
+ list.size should be (0)
}
}
@@ -499,25 +435,6 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
new SparkConf().set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
}
- private def writeOldLog(
- fname: String,
- sparkVersion: String,
- codec: Option[CompressionCodec],
- completed: Boolean,
- events: SparkListenerEvent*): File = {
- val log = new File(testDir, fname)
- log.mkdir()
-
- val oldEventLog = new File(log, LOG_PREFIX + "1")
- createEmptyFile(new File(log, SPARK_VERSION_PREFIX + sparkVersion))
- writeFile(new File(log, LOG_PREFIX + "1"), false, codec, events: _*)
- if (completed) {
- createEmptyFile(new File(log, APPLICATION_COMPLETE))
- }
-
- log
- }
-
private class SafeModeTestProvider(conf: SparkConf, clock: Clock)
extends FsHistoryProvider(conf, clock) {
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index be55b2e0fe..40d0076eec 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -176,18 +176,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
(1 to 2).foreach { attemptId => doDownloadTest("local-1430917381535", Some(attemptId)) }
}
- test("download legacy logs - all attempts") {
- doDownloadTest("local-1426533911241", None, legacy = true)
- }
-
- test("download legacy logs - single attempts") {
- (1 to 2). foreach {
- attemptId => doDownloadTest("local-1426533911241", Some(attemptId), legacy = true)
- }
- }
-
// Test that the files are downloaded correctly, and validate them.
- def doDownloadTest(appId: String, attemptId: Option[Int], legacy: Boolean = false): Unit = {
+ def doDownloadTest(appId: String, attemptId: Option[Int]): Unit = {
val url = attemptId match {
case Some(id) =>
@@ -205,22 +195,13 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
var entry = zipStream.getNextEntry
entry should not be null
val totalFiles = {
- if (legacy) {
- attemptId.map { x => 3 }.getOrElse(6)
- } else {
- attemptId.map { x => 1 }.getOrElse(2)
- }
+ attemptId.map { x => 1 }.getOrElse(2)
}
var filesCompared = 0
while (entry != null) {
if (!entry.isDirectory) {
val expectedFile = {
- if (legacy) {
- val splits = entry.getName.split("/")
- new File(new File(logDir, splits(0)), splits(1))
- } else {
- new File(logDir, entry.getName)
- }
+ new File(logDir, entry.getName)
}
val expected = Files.toString(expectedFile, Charsets.UTF_8)
val actual = new String(ByteStreams.toByteArray(zipStream), Charsets.UTF_8)