aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-03-02 16:34:32 -0800
committerPatrick Wendell <patrick@databricks.com>2015-03-02 16:34:32 -0800
commit6776cb33ea691f7843b956b3e80979282967e826 (patch)
tree8d8faa5a2fc5cf7c882320ae997632cad833a243
parent1a49496b4a9df40c74739fc0fb8a21c88a477075 (diff)
downloadspark-6776cb33ea691f7843b956b3e80979282967e826.tar.gz
spark-6776cb33ea691f7843b956b3e80979282967e826.tar.bz2
spark-6776cb33ea691f7843b956b3e80979282967e826.zip
[SPARK-6066] Make event log format easier to parse
Some users have reported difficulty in parsing the new event log format. Since we embed the metadata in the beginning of the file, when we compress the event log we need to skip the metadata because we need that information to parse the log later. This means we'll end up with a partially compressed file if event logging compression is turned on. The old format looks like: ``` sparkVersion = 1.3.0 compressionCodec = org.apache.spark.io.LZFCompressionCodec === LOG_HEADER_END === // actual events, could be compressed bytes ``` The new format in this patch puts the compression codec in the log file name instead. It also removes the metadata header altogether along with the Spark version, which was not needed. The new file name looks something like: ``` app_without_compression app_123.lzf app_456.snappy ``` I tested this with and without compression, using different compression codecs and event logging directories. I verified that both the `Master` and the `HistoryServer` can render both compressed and uncompressed logs as before. Author: Andrew Or <andrew@databricks.com> Closes #4821 from andrewor14/event-log-format and squashes the following commits: 8511141 [Andrew Or] Fix test 654883d [Andrew Or] Add back metadata with Spark version 7f537cd [Andrew Or] Address review feedback 7d6aa61 [Andrew Or] Make codec an extension 59abee9 [Andrew Or] Merge branch 'master' of github.com:apache/spark into event-log-format 27c9a6c [Andrew Or] Address review feedback 519e51a [Andrew Or] Address review feedback ef69276 [Andrew Or] Merge branch 'master' of github.com:apache/spark into event-log-format 88a091d [Andrew Or] Add tests for new format and file name f32d8d2 [Andrew Or] Fix tests 8db5a06 [Andrew Or] Embed metadata in the event log file name instead
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/io/CompressionCodec.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala162
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala14
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala69
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala62
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala13
14 files changed, 212 insertions, 189 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 3cd0c218a3..e231e8369d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -51,6 +51,7 @@ import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.executor.TriggerThreadDump
import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat,
FixedLengthBinaryInputFormat}
+import org.apache.spark.io.CompressionCodec
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.scheduler._
@@ -233,6 +234,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
None
}
}
+ private[spark] val eventLogCodec: Option[String] = {
+ val compress = conf.getBoolean("spark.eventLog.compress", false)
+ if (compress && isEventLogEnabled) {
+ Some(CompressionCodec.getCodecName(conf)).map(CompressionCodec.getShortName)
+ } else {
+ None
+ }
+ }
// Generate the random name for a temp folder in Tachyon
// Add a timestamp as the suffix here to make it more safe
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index ae55b4ff40..3d0d68de8f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -23,7 +23,9 @@ private[spark] class ApplicationDescription(
val memoryPerSlave: Int,
val command: Command,
var appUiUrl: String,
- val eventLogDir: Option[String] = None)
+ val eventLogDir: Option[String] = None,
+ // short name of compression codec used when writing event logs, if any (e.g. lzf)
+ val eventLogCodec: Option[String] = None)
extends Serializable {
val user = System.getProperty("user.name", "<unknown>")
@@ -34,8 +36,10 @@ private[spark] class ApplicationDescription(
memoryPerSlave: Int = memoryPerSlave,
command: Command = command,
appUiUrl: String = appUiUrl,
- eventLogDir: Option[String] = eventLogDir): ApplicationDescription =
- new ApplicationDescription(name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir)
+ eventLogDir: Option[String] = eventLogDir,
+ eventLogCodec: Option[String] = eventLogCodec): ApplicationDescription =
+ new ApplicationDescription(
+ name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir, eventLogCodec)
override def toString: String = "ApplicationDescription(" + name + ")"
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index c5fab1d440..16d88c17d1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -83,8 +83,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
// Constants used to parse Spark 1.0.0 log directories.
private[history] val LOG_PREFIX = "EVENT_LOG_"
- private[history] val SPARK_VERSION_PREFIX = "SPARK_VERSION_"
- private[history] val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_"
+ private[history] val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
+ private[history] val COMPRESSION_CODEC_PREFIX = EventLoggingListener.COMPRESSION_CODEC_KEY + "_"
private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
/**
@@ -324,7 +324,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationHistoryInfo = {
val logPath = eventLog.getPath()
logInfo(s"Replaying log path: $logPath")
- val (logInput, sparkVersion) =
+ val logInput =
if (isLegacyLogDirectory(eventLog)) {
openLegacyEventLog(logPath)
} else {
@@ -333,7 +333,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
try {
val appListener = new ApplicationEventListener
bus.addListener(appListener)
- bus.replay(logInput, sparkVersion, logPath.toString)
+ bus.replay(logInput, logPath.toString)
new FsApplicationHistoryInfo(
logPath.getName(),
appListener.appId.getOrElse(logPath.getName()),
@@ -353,30 +353,24 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
* log file (along with other metadata files), which is the case for directories generated by
* the code in previous releases.
*
- * @return 2-tuple of (input stream of the events, version of Spark which wrote the log)
+ * @return input stream that holds one JSON record per line.
*/
- private[history] def openLegacyEventLog(dir: Path): (InputStream, String) = {
+ private[history] def openLegacyEventLog(dir: Path): InputStream = {
val children = fs.listStatus(dir)
var eventLogPath: Path = null
var codecName: Option[String] = None
- var sparkVersion: String = null
children.foreach { child =>
child.getPath().getName() match {
case name if name.startsWith(LOG_PREFIX) =>
eventLogPath = child.getPath()
-
case codec if codec.startsWith(COMPRESSION_CODEC_PREFIX) =>
codecName = Some(codec.substring(COMPRESSION_CODEC_PREFIX.length()))
-
- case version if version.startsWith(SPARK_VERSION_PREFIX) =>
- sparkVersion = version.substring(SPARK_VERSION_PREFIX.length())
-
case _ =>
}
}
- if (eventLogPath == null || sparkVersion == null) {
+ if (eventLogPath == null) {
throw new IllegalArgumentException(s"$dir is not a Spark application log directory.")
}
@@ -388,7 +382,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
}
val in = new BufferedInputStream(fs.open(eventLogPath))
- (codec.map(_.compressedInputStream(in)).getOrElse(in), sparkVersion)
+ codec.map(_.compressedInputStream(in)).getOrElse(in)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 8cc6ec1e81..148485cc11 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -737,13 +737,13 @@ private[spark] class Master(
val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found"
try {
val eventLogFile = app.desc.eventLogDir
- .map { dir => EventLoggingListener.getLogPath(dir, app.id) }
+ .map { dir => EventLoggingListener.getLogPath(dir, app.id, app.desc.eventLogCodec) }
.getOrElse {
// Event logging is not enabled for this application
app.desc.appUiUrl = notFoundBasePath
return false
}
-
+
val fs = Utils.getHadoopFileSystem(eventLogFile, hadoopConf)
if (fs.exists(new Path(eventLogFile + EventLoggingListener.IN_PROGRESS))) {
@@ -756,12 +756,12 @@ private[spark] class Master(
return false
}
- val (logInput, sparkVersion) = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
+ val logInput = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
val replayBus = new ReplayListenerBus()
val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
appName + " (completed)", HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
try {
- replayBus.replay(logInput, sparkVersion, eventLogFile)
+ replayBus.replay(logInput, eventLogFile)
} finally {
logInput.close()
}
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index f856890d27..0709b6d689 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -26,7 +26,6 @@ import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}
import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils
-import org.apache.spark.Logging
/**
* :: DeveloperApi ::
@@ -53,8 +52,12 @@ private[spark] object CompressionCodec {
"lzf" -> classOf[LZFCompressionCodec].getName,
"snappy" -> classOf[SnappyCompressionCodec].getName)
+ def getCodecName(conf: SparkConf): String = {
+ conf.get(configKey, DEFAULT_COMPRESSION_CODEC)
+ }
+
def createCodec(conf: SparkConf): CompressionCodec = {
- createCodec(conf, conf.get(configKey, DEFAULT_COMPRESSION_CODEC))
+ createCodec(conf, getCodecName(conf))
}
def createCodec(conf: SparkConf, codecName: String): CompressionCodec = {
@@ -71,6 +74,20 @@ private[spark] object CompressionCodec {
s"Consider setting $configKey=$FALLBACK_COMPRESSION_CODEC"))
}
+ /**
+ * Return the short version of the given codec name.
+ * If it is already a short name, just return it.
+ */
+ def getShortName(codecName: String): String = {
+ if (shortCompressionCodecNames.contains(codecName)) {
+ codecName
+ } else {
+ shortCompressionCodecNames
+ .collectFirst { case (k, v) if v == codecName => k }
+ .getOrElse { throw new IllegalArgumentException(s"No short name for codec $codecName.") }
+ }
+ }
+
val FALLBACK_COMPRESSION_CODEC = "lzf"
val DEFAULT_COMPRESSION_CODEC = "snappy"
val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 30075c172b..2091a9fe8d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -62,6 +62,15 @@ private[spark] class EventLoggingListener(
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
private val fileSystem = Utils.getHadoopFileSystem(new URI(logBaseDir), hadoopConf)
+ private val compressionCodec =
+ if (shouldCompress) {
+ Some(CompressionCodec.createCodec(sparkConf))
+ } else {
+ None
+ }
+ private val compressionCodecName = compressionCodec.map { c =>
+ CompressionCodec.getShortName(c.getClass.getName)
+ }
// Only defined if the file system scheme is not local
private var hadoopDataStream: Option[FSDataOutputStream] = None
@@ -80,7 +89,7 @@ private[spark] class EventLoggingListener(
private[scheduler] val loggedEvents = new ArrayBuffer[JValue]
// Visible for tests only.
- private[scheduler] val logPath = getLogPath(logBaseDir, appId)
+ private[scheduler] val logPath = getLogPath(logBaseDir, appId, compressionCodecName)
/**
* Creates the log file in the configured log directory.
@@ -111,19 +120,19 @@ private[spark] class EventLoggingListener(
hadoopDataStream.get
}
- val compressionCodec =
- if (shouldCompress) {
- Some(CompressionCodec.createCodec(sparkConf))
- } else {
- None
- }
-
- fileSystem.setPermission(path, LOG_FILE_PERMISSIONS)
- val logStream = initEventLog(new BufferedOutputStream(dstream, outputBufferSize),
- compressionCodec)
- writer = Some(new PrintWriter(logStream))
+ try {
+ val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream)
+ val bstream = new BufferedOutputStream(cstream, outputBufferSize)
- logInfo("Logging events to %s".format(logPath))
+ EventLoggingListener.initEventLog(bstream)
+ fileSystem.setPermission(path, LOG_FILE_PERMISSIONS)
+ writer = Some(new PrintWriter(bstream))
+ logInfo("Logging events to %s".format(logPath))
+ } catch {
+ case e: Exception =>
+ dstream.close()
+ throw e
+ }
}
/** Log the event as JSON. */
@@ -201,77 +210,57 @@ private[spark] object EventLoggingListener extends Logging {
// Suffix applied to the names of files still being written by applications.
val IN_PROGRESS = ".inprogress"
val DEFAULT_LOG_DIR = "/tmp/spark-events"
+ val SPARK_VERSION_KEY = "SPARK_VERSION"
+ val COMPRESSION_CODEC_KEY = "COMPRESSION_CODEC"
private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort)
- // Marker for the end of header data in a log file. After this marker, log data, potentially
- // compressed, will be found.
- private val HEADER_END_MARKER = "=== LOG_HEADER_END ==="
-
- // To avoid corrupted files causing the heap to fill up. Value is arbitrary.
- private val MAX_HEADER_LINE_LENGTH = 4096
-
// A cache for compression codecs to avoid creating the same codec many times
private val codecMap = new mutable.HashMap[String, CompressionCodec]
/**
- * Write metadata about the event log to the given stream.
- *
- * The header is a serialized version of a map, except it does not use Java serialization to
- * avoid incompatibilities between different JDKs. It writes one map entry per line, in
- * "key=value" format.
+ * Write metadata about an event log to the given stream.
+ * The metadata is encoded in the first line of the event log as JSON.
*
- * The very last entry in the header is the `HEADER_END_MARKER` marker, so that the parsing code
- * can know when to stop.
- *
- * The format needs to be kept in sync with the openEventLog() method below. Also, it cannot
- * change in new Spark versions without some other way of detecting the change (like some
- * metadata encoded in the file name).
- *
- * @param logStream Raw output stream to the even log file.
- * @param compressionCodec Optional compression codec to use.
- * @return A stream where to write event log data. This may be a wrapper around the original
- * stream (for example, when compression is enabled).
+ * @param logStream Raw output stream to the event log file.
*/
- def initEventLog(
- logStream: OutputStream,
- compressionCodec: Option[CompressionCodec]): OutputStream = {
- val meta = mutable.HashMap(("version" -> SPARK_VERSION))
- compressionCodec.foreach { codec =>
- meta += ("compressionCodec" -> codec.getClass().getName())
- }
-
- def write(entry: String) = {
- val bytes = entry.getBytes(Charsets.UTF_8)
- if (bytes.length > MAX_HEADER_LINE_LENGTH) {
- throw new IOException(s"Header entry too long: ${entry}")
- }
- logStream.write(bytes, 0, bytes.length)
- }
-
- meta.foreach { case (k, v) => write(s"$k=$v\n") }
- write(s"$HEADER_END_MARKER\n")
- compressionCodec.map(_.compressedOutputStream(logStream)).getOrElse(logStream)
+ def initEventLog(logStream: OutputStream): Unit = {
+ val metadata = SparkListenerLogStart(SPARK_VERSION)
+ val metadataJson = compact(JsonProtocol.logStartToJson(metadata)) + "\n"
+ logStream.write(metadataJson.getBytes(Charsets.UTF_8))
}
/**
* Return a file-system-safe path to the log file for the given application.
*
+ * Note that because we currently only create a single log file for each application,
+ * we must encode all the information needed to parse this event log in the file name
+ * instead of within the file itself. Otherwise, if the file is compressed, for instance,
+ * we won't know which codec to use to decompress the metadata needed to open the file in
+ * the first place.
+ *
* @param logBaseDir Directory where the log file will be written.
* @param appId A unique app ID.
+ * @param compressionCodecName Name to identify the codec used to compress the contents
+ * of the log, or None if compression is not enabled.
* @return A path which consists of file-system-safe characters.
*/
- def getLogPath(logBaseDir: String, appId: String): String = {
- val name = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_").toLowerCase
- Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")
+ def getLogPath(
+ logBaseDir: String,
+ appId: String,
+ compressionCodecName: Option[String] = None): String = {
+ val sanitizedAppId = appId.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase
+ // e.g. app_123, app_123.lzf
+ val logName = sanitizedAppId + compressionCodecName.map { "." + _ }.getOrElse("")
+ Utils.resolveURI(logBaseDir).toString.stripSuffix("/") + "/" + logName
}
/**
- * Opens an event log file and returns an input stream to the event data.
+ * Opens an event log file and returns an input stream that contains the event data.
*
- * @return 2-tuple (event input stream, Spark version of event data)
+ * @return input stream that holds one JSON record per line.
*/
- def openEventLog(log: Path, fs: FileSystem): (InputStream, String) = {
+ def openEventLog(log: Path, fs: FileSystem): InputStream = {
// It's not clear whether FileSystem.open() throws FileNotFoundException or just plain
// IOException when a file does not exist, so try our best to throw a proper exception.
if (!fs.exists(log)) {
@@ -279,52 +268,17 @@ private[spark] object EventLoggingListener extends Logging {
}
val in = new BufferedInputStream(fs.open(log))
- // Read a single line from the input stream without buffering.
- // We cannot use BufferedReader because we must avoid reading
- // beyond the end of the header, after which the content of the
- // file may be compressed.
- def readLine(): String = {
- val bytes = new ByteArrayOutputStream()
- var next = in.read()
- var count = 0
- while (next != '\n') {
- if (next == -1) {
- throw new IOException("Unexpected end of file.")
- }
- bytes.write(next)
- count = count + 1
- if (count > MAX_HEADER_LINE_LENGTH) {
- throw new IOException("Maximum header line length exceeded.")
- }
- next = in.read()
- }
- new String(bytes.toByteArray(), Charsets.UTF_8)
+
+ // Compression codec is encoded as an extension, e.g. app_123.lzf
+ // Since we sanitize the app ID to not include periods, it is safe to split on it
+ val logName = log.getName.stripSuffix(IN_PROGRESS)
+ val codecName: Option[String] = logName.split("\\.").tail.lastOption
+ val codec = codecName.map { c =>
+ codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c))
}
- // Parse the header metadata in the form of k=v pairs
- // This assumes that every line before the header end marker follows this format
try {
- val meta = new mutable.HashMap[String, String]()
- var foundEndMarker = false
- while (!foundEndMarker) {
- readLine() match {
- case HEADER_END_MARKER =>
- foundEndMarker = true
- case entry =>
- val prop = entry.split("=", 2)
- if (prop.length != 2) {
- throw new IllegalArgumentException("Invalid metadata in log file.")
- }
- meta += (prop(0) -> prop(1))
- }
- }
-
- val sparkVersion = meta.get("version").getOrElse(
- throw new IllegalArgumentException("Missing Spark version in log metadata."))
- val codec = meta.get("compressionCodec").map { codecName =>
- codecMap.getOrElseUpdate(codecName, CompressionCodec.createCodec(new SparkConf, codecName))
- }
- (codec.map(_.compressedInputStream(in)).getOrElse(in), sparkVersion)
+ codec.map(_.compressedInputStream(in)).getOrElse(in)
} catch {
case e: Exception =>
in.close()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index d9c3a10dc5..95273c716b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -39,10 +39,9 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
* error is thrown by this method.
*
* @param logData Stream containing event log data.
- * @param version Spark version that generated the events.
* @param sourceName Filename (or other source identifier) from whence @logData is being read
*/
- def replay(logData: InputStream, version: String, sourceName: String) {
+ def replay(logData: InputStream, sourceName: String): Unit = {
var currentLine: String = null
var lineNumber: Int = 1
try {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index dd28ddb31d..52720d48ca 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -116,6 +116,11 @@ case class SparkListenerApplicationStart(appName: String, appId: Option[String],
@DeveloperApi
case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
+/**
+ * An internal class that describes the metadata of an event log.
+ * This event is not meant to be posted to listeners downstream.
+ */
+private[spark] case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent
/**
* :: DeveloperApi ::
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index fe8a19a2c0..61e69ecc08 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -58,6 +58,7 @@ private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkLi
listener.onExecutorAdded(executorAdded)
case executorRemoved: SparkListenerExecutorRemoved =>
listener.onExecutorRemoved(executorRemoved)
+ case logStart: SparkListenerLogStart => // ignore event log metadata
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index a0aa555f62..ffd4825705 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -85,7 +85,7 @@ private[spark] class SparkDeploySchedulerBackend(
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
- appUIAddress, sc.eventLogDir)
+ appUIAddress, sc.eventLogDir, sc.eventLogCodec)
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 8e20864db5..474f79fb75 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -89,6 +89,8 @@ private[spark] object JsonProtocol {
executorAddedToJson(executorAdded)
case executorRemoved: SparkListenerExecutorRemoved =>
executorRemovedToJson(executorRemoved)
+ case logStart: SparkListenerLogStart =>
+ logStartToJson(logStart)
// These aren't used, but keeps compiler happy
case SparkListenerExecutorMetricsUpdate(_, _) => JNothing
}
@@ -214,6 +216,11 @@ private[spark] object JsonProtocol {
("Removed Reason" -> executorRemoved.reason)
}
+ def logStartToJson(logStart: SparkListenerLogStart): JValue = {
+ ("Event" -> Utils.getFormattedClassName(logStart)) ~
+ ("Spark Version" -> SPARK_VERSION)
+ }
+
/** ------------------------------------------------------------------- *
* JSON serialization methods for classes SparkListenerEvents depend on |
* -------------------------------------------------------------------- */
@@ -447,6 +454,7 @@ private[spark] object JsonProtocol {
val applicationEnd = Utils.getFormattedClassName(SparkListenerApplicationEnd)
val executorAdded = Utils.getFormattedClassName(SparkListenerExecutorAdded)
val executorRemoved = Utils.getFormattedClassName(SparkListenerExecutorRemoved)
+ val logStart = Utils.getFormattedClassName(SparkListenerLogStart)
(json \ "Event").extract[String] match {
case `stageSubmitted` => stageSubmittedFromJson(json)
@@ -464,6 +472,7 @@ private[spark] object JsonProtocol {
case `applicationEnd` => applicationEndFromJson(json)
case `executorAdded` => executorAddedFromJson(json)
case `executorRemoved` => executorRemovedFromJson(json)
+ case `logStart` => logStartFromJson(json)
}
}
@@ -574,6 +583,11 @@ private[spark] object JsonProtocol {
SparkListenerExecutorRemoved(time, executorId, reason)
}
+ def logStartFromJson(json: JValue): SparkListenerLogStart = {
+ val sparkVersion = (json \ "Spark Version").extract[String]
+ SparkListenerLogStart(sparkVersion)
+ }
+
/** --------------------------------------------------------------------- *
* JSON deserialization methods for classes SparkListenerEvents depend on |
* ---------------------------------------------------------------------- */
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 85939eaadc..e908ba604e 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -17,18 +17,17 @@
package org.apache.spark.deploy.history
-import java.io.{File, FileOutputStream, OutputStreamWriter}
+import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStreamWriter}
+import java.net.URI
import scala.io.Source
-import com.google.common.io.Files
import org.apache.hadoop.fs.Path
import org.json4s.jackson.JsonMethods._
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.Matchers
import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io._
import org.apache.spark.scheduler._
import org.apache.spark.util.{JsonProtocol, Utils}
@@ -45,18 +44,35 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
Utils.deleteRecursively(testDir)
}
+ /** Create a fake log file using the new log format used in Spark 1.3+ */
+ private def newLogFile(
+ appId: String,
+ inProgress: Boolean,
+ codec: Option[String] = None): File = {
+ val ip = if (inProgress) EventLoggingListener.IN_PROGRESS else ""
+ val logUri = EventLoggingListener.getLogPath(testDir.getAbsolutePath, appId)
+ val logPath = new URI(logUri).getPath + ip
+ new File(logPath)
+ }
+
test("Parse new and old application logs") {
val provider = new FsHistoryProvider(createTestConf())
// Write a new-style application log.
- val newAppComplete = new File(testDir, "new1")
+ val newAppComplete = newLogFile("new1", inProgress = false)
writeFile(newAppComplete, true, None,
SparkListenerApplicationStart("new-app-complete", None, 1L, "test"),
- SparkListenerApplicationEnd(4L)
+ SparkListenerApplicationEnd(5L)
)
+ // Write a new-style application log.
+ val newAppCompressedComplete = newLogFile("new1compressed", inProgress = false, Some("lzf"))
+ writeFile(newAppCompressedComplete, true, None,
+ SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test"),
+ SparkListenerApplicationEnd(4L))
+
// Write an unfinished app, new-style.
- val newAppIncomplete = new File(testDir, "new2" + EventLoggingListener.IN_PROGRESS)
+ val newAppIncomplete = newLogFile("new2", inProgress = true)
writeFile(newAppIncomplete, true, None,
SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test")
)
@@ -89,16 +105,18 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
val list = provider.getListing().toSeq
list should not be (null)
- list.size should be (4)
- list.count(e => e.completed) should be (2)
+ list.size should be (5)
+ list.count(_.completed) should be (3)
- list(0) should be (ApplicationHistoryInfo(newAppComplete.getName(), "new-app-complete", 1L, 4L,
+ list(0) should be (ApplicationHistoryInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L,
newAppComplete.lastModified(), "test", true))
- list(1) should be (ApplicationHistoryInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
+ list(1) should be (ApplicationHistoryInfo(newAppCompressedComplete.getName(),
+ "new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test", true))
+ list(2) should be (ApplicationHistoryInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
oldAppComplete.lastModified(), "test", true))
- list(2) should be (ApplicationHistoryInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L,
+ list(3) should be (ApplicationHistoryInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L,
-1L, oldAppIncomplete.lastModified(), "test", false))
- list(3) should be (ApplicationHistoryInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L,
+ list(4) should be (ApplicationHistoryInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L,
-1L, newAppIncomplete.lastModified(), "test", false))
// Make sure the UI can be rendered.
@@ -127,7 +145,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
val logPath = new Path(logDir.getAbsolutePath())
try {
- val (logInput, sparkVersion) = provider.openLegacyEventLog(logPath)
+ val logInput = provider.openLegacyEventLog(logPath)
try {
Source.fromInputStream(logInput).getLines().toSeq.size should be (2)
} finally {
@@ -141,12 +159,12 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
}
test("SPARK-3697: ignore directories that cannot be read.") {
- val logFile1 = new File(testDir, "new1")
+ val logFile1 = newLogFile("new1", inProgress = false)
writeFile(logFile1, true, None,
SparkListenerApplicationStart("app1-1", None, 1L, "test"),
SparkListenerApplicationEnd(2L)
)
- val logFile2 = new File(testDir, "new2")
+ val logFile2 = newLogFile("new2", inProgress = false)
writeFile(logFile2, true, None,
SparkListenerApplicationStart("app1-2", None, 1L, "test"),
SparkListenerApplicationEnd(2L)
@@ -164,7 +182,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
test("history file is renamed from inprogress to completed") {
val provider = new FsHistoryProvider(createTestConf())
- val logFile1 = new File(testDir, "app1" + EventLoggingListener.IN_PROGRESS)
+ val logFile1 = newLogFile("app1", inProgress = true)
writeFile(logFile1, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test"),
SparkListenerApplicationEnd(2L)
@@ -174,7 +192,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
appListBeforeRename.size should be (1)
appListBeforeRename.head.logPath should endWith(EventLoggingListener.IN_PROGRESS)
- logFile1.renameTo(new File(testDir, "app1"))
+ logFile1.renameTo(newLogFile("app1", inProgress = false))
provider.checkForLogs()
val appListAfterRename = provider.getListing()
appListAfterRename.size should be (1)
@@ -184,7 +202,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
test("SPARK-5582: empty log directory") {
val provider = new FsHistoryProvider(createTestConf())
- val logFile1 = new File(testDir, "app1" + EventLoggingListener.IN_PROGRESS)
+ val logFile1 = newLogFile("app1", inProgress = true)
writeFile(logFile1, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test"),
SparkListenerApplicationEnd(2L))
@@ -199,14 +217,13 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
private def writeFile(file: File, isNewFormat: Boolean, codec: Option[CompressionCodec],
events: SparkListenerEvent*) = {
- val out =
- if (isNewFormat) {
- EventLoggingListener.initEventLog(new FileOutputStream(file), codec)
- } else {
- val fileStream = new FileOutputStream(file)
- codec.map(_.compressedOutputStream(fileStream)).getOrElse(fileStream)
- }
- val writer = new OutputStreamWriter(out, "UTF-8")
+ val fstream = new FileOutputStream(file)
+ val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream)
+ val bstream = new BufferedOutputStream(cstream)
+ if (isNewFormat) {
+ EventLoggingListener.initEventLog(new FileOutputStream(file))
+ }
+ val writer = new OutputStreamWriter(bstream, "UTF-8")
try {
events.foreach(e => writer.write(compact(render(JsonProtocol.sparkEventToJson(e))) + "\n"))
} finally {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 437d8693c0..992dde66f9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.scheduler
import java.io.{File, FileOutputStream, InputStream, IOException}
+import java.net.URI
import scala.collection.mutable
import scala.io.Source
@@ -26,7 +27,7 @@ import org.apache.hadoop.fs.Path
import org.json4s.jackson.JsonMethods._
import org.scalatest.{BeforeAndAfter, FunSuite}
-import org.apache.spark.{Logging, SparkConf, SparkContext}
+import org.apache.spark.{Logging, SparkConf, SparkContext, SPARK_VERSION}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io._
import org.apache.spark.util.{JsonProtocol, Utils}
@@ -78,7 +79,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
test("Basic event logging with compression") {
CompressionCodec.ALL_COMPRESSION_CODECS.foreach { codec =>
- testEventLogging(compressionCodec = Some(codec))
+ testEventLogging(compressionCodec = Some(CompressionCodec.getShortName(codec)))
}
}
@@ -88,25 +89,35 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
test("End-to-end event logging with compression") {
CompressionCodec.ALL_COMPRESSION_CODECS.foreach { codec =>
- testApplicationEventLogging(compressionCodec = Some(codec))
+ testApplicationEventLogging(compressionCodec = Some(CompressionCodec.getShortName(codec)))
}
}
test("Log overwriting") {
- val log = new FileOutputStream(new File(testDir, "test"))
- log.close()
- try {
- testEventLogging()
- assert(false)
- } catch {
- case e: IOException =>
- // Expected, since we haven't enabled log overwrite.
- }
-
+ val logUri = EventLoggingListener.getLogPath(testDir.getAbsolutePath, "test")
+ val logPath = new URI(logUri).getPath
+ // Create file before writing the event log
+ new FileOutputStream(new File(logPath)).close()
+ // Expected IOException, since we haven't enabled log overwrite.
+ intercept[IOException] { testEventLogging() }
// Try again, but enable overwriting.
testEventLogging(extraConf = Map("spark.eventLog.overwrite" -> "true"))
}
+ test("Event log name") {
+ // without compression
+ assert(s"file:/base-dir/app1" === EventLoggingListener.getLogPath("/base-dir", "app1"))
+ // with compression
+ assert(s"file:/base-dir/app1.lzf" ===
+ EventLoggingListener.getLogPath("/base-dir", "app1", Some("lzf")))
+ // illegal characters in app ID
+ assert(s"file:/base-dir/a-fine-mind_dollar_bills__1" ===
+ EventLoggingListener.getLogPath("/base-dir", "a fine:mind$dollar{bills}.1"))
+ // illegal characters in app ID with compression
+ assert(s"file:/base-dir/a-fine-mind_dollar_bills__1.lz4" ===
+ EventLoggingListener.getLogPath("/base-dir", "a fine:mind$dollar{bills}.1", Some("lz4")))
+ }
+
/* ----------------- *
* Actual test logic *
* ----------------- */
@@ -140,15 +151,17 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
eventLogger.stop()
// Verify file contains exactly the two events logged
- val (logData, version) = EventLoggingListener.openEventLog(new Path(eventLogger.logPath),
- fileSystem)
+ val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem)
try {
val lines = readLines(logData)
- assert(lines.size === 2)
- assert(lines(0).contains("SparkListenerApplicationStart"))
- assert(lines(1).contains("SparkListenerApplicationEnd"))
- assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === applicationStart)
- assert(JsonProtocol.sparkEventFromJson(parse(lines(1))) === applicationEnd)
+ val logStart = SparkListenerLogStart(SPARK_VERSION)
+ assert(lines.size === 3)
+ assert(lines(0).contains("SparkListenerLogStart"))
+ assert(lines(1).contains("SparkListenerApplicationStart"))
+ assert(lines(2).contains("SparkListenerApplicationEnd"))
+ assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
+ assert(JsonProtocol.sparkEventFromJson(parse(lines(1))) === applicationStart)
+ assert(JsonProtocol.sparkEventFromJson(parse(lines(2))) === applicationEnd)
} finally {
logData.close()
}
@@ -163,8 +176,10 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
val sc = new SparkContext("local-cluster[2,2,512]", "test", conf)
assert(sc.eventLogger.isDefined)
val eventLogger = sc.eventLogger.get
+ val eventLogPath = eventLogger.logPath
val expectedLogDir = testDir.toURI().toString()
- assert(eventLogger.logPath.startsWith(expectedLogDir + "/"))
+ assert(eventLogPath === EventLoggingListener.getLogPath(
+ expectedLogDir, sc.applicationId, compressionCodec.map(CompressionCodec.getShortName)))
// Begin listening for events that trigger asserts
val eventExistenceListener = new EventExistenceListener(eventLogger)
@@ -178,8 +193,8 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
eventExistenceListener.assertAllCallbacksInvoked()
// Make sure expected events exist in the log file.
- val (logData, version) = EventLoggingListener.openEventLog(new Path(eventLogger.logPath),
- fileSystem)
+ val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem)
+ val logStart = SparkListenerLogStart(SPARK_VERSION)
val lines = readLines(logData)
val eventSet = mutable.Set(
SparkListenerApplicationStart,
@@ -204,6 +219,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
}
}
}
+ assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index 702c4cb3bd..601694f57a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -61,7 +61,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
try {
val replayer = new ReplayListenerBus()
replayer.addListener(eventMonster)
- replayer.replay(logData, SPARK_VERSION, logFilePath.toString)
+ replayer.replay(logData, logFilePath.toString)
} finally {
logData.close()
}
@@ -115,12 +115,12 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
assert(!eventLog.isDir)
// Replay events
- val (logData, version) = EventLoggingListener.openEventLog(eventLog.getPath(), fileSystem)
+ val logData = EventLoggingListener.openEventLog(eventLog.getPath(), fileSystem)
val eventMonster = new EventMonster(conf)
try {
val replayer = new ReplayListenerBus()
replayer.addListener(eventMonster)
- replayer.replay(logData, version, eventLog.getPath().toString)
+ replayer.replay(logData, eventLog.getPath().toString)
} finally {
logData.close()
}
@@ -150,11 +150,4 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
override def start() { }
}
-
- private def getCompressionCodec(codecName: String) = {
- val conf = new SparkConf
- conf.set("spark.io.compression.codec", codecName)
- CompressionCodec.createCodec(conf)
- }
-
}