aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/io/CompressionCodec.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala162
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala14
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala69
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala62
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala13
14 files changed, 212 insertions, 189 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 3cd0c218a3..e231e8369d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -51,6 +51,7 @@ import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.executor.TriggerThreadDump
import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat,
FixedLengthBinaryInputFormat}
+import org.apache.spark.io.CompressionCodec
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.scheduler._
@@ -233,6 +234,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
None
}
}
+ private[spark] val eventLogCodec: Option[String] = {
+ val compress = conf.getBoolean("spark.eventLog.compress", false)
+ if (compress && isEventLogEnabled) {
+ Some(CompressionCodec.getCodecName(conf)).map(CompressionCodec.getShortName)
+ } else {
+ None
+ }
+ }
// Generate the random name for a temp folder in Tachyon
// Add a timestamp as the suffix here to make it more safe
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index ae55b4ff40..3d0d68de8f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -23,7 +23,9 @@ private[spark] class ApplicationDescription(
val memoryPerSlave: Int,
val command: Command,
var appUiUrl: String,
- val eventLogDir: Option[String] = None)
+ val eventLogDir: Option[String] = None,
+ // short name of compression codec used when writing event logs, if any (e.g. lzf)
+ val eventLogCodec: Option[String] = None)
extends Serializable {
val user = System.getProperty("user.name", "<unknown>")
@@ -34,8 +36,10 @@ private[spark] class ApplicationDescription(
memoryPerSlave: Int = memoryPerSlave,
command: Command = command,
appUiUrl: String = appUiUrl,
- eventLogDir: Option[String] = eventLogDir): ApplicationDescription =
- new ApplicationDescription(name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir)
+ eventLogDir: Option[String] = eventLogDir,
+ eventLogCodec: Option[String] = eventLogCodec): ApplicationDescription =
+ new ApplicationDescription(
+ name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir, eventLogCodec)
override def toString: String = "ApplicationDescription(" + name + ")"
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index c5fab1d440..16d88c17d1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -83,8 +83,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
// Constants used to parse Spark 1.0.0 log directories.
private[history] val LOG_PREFIX = "EVENT_LOG_"
- private[history] val SPARK_VERSION_PREFIX = "SPARK_VERSION_"
- private[history] val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_"
+ private[history] val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
+ private[history] val COMPRESSION_CODEC_PREFIX = EventLoggingListener.COMPRESSION_CODEC_KEY + "_"
private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
/**
@@ -324,7 +324,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationHistoryInfo = {
val logPath = eventLog.getPath()
logInfo(s"Replaying log path: $logPath")
- val (logInput, sparkVersion) =
+ val logInput =
if (isLegacyLogDirectory(eventLog)) {
openLegacyEventLog(logPath)
} else {
@@ -333,7 +333,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
try {
val appListener = new ApplicationEventListener
bus.addListener(appListener)
- bus.replay(logInput, sparkVersion, logPath.toString)
+ bus.replay(logInput, logPath.toString)
new FsApplicationHistoryInfo(
logPath.getName(),
appListener.appId.getOrElse(logPath.getName()),
@@ -353,30 +353,24 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
* log file (along with other metadata files), which is the case for directories generated by
* the code in previous releases.
*
- * @return 2-tuple of (input stream of the events, version of Spark which wrote the log)
+ * @return input stream that holds one JSON record per line.
*/
- private[history] def openLegacyEventLog(dir: Path): (InputStream, String) = {
+ private[history] def openLegacyEventLog(dir: Path): InputStream = {
val children = fs.listStatus(dir)
var eventLogPath: Path = null
var codecName: Option[String] = None
- var sparkVersion: String = null
children.foreach { child =>
child.getPath().getName() match {
case name if name.startsWith(LOG_PREFIX) =>
eventLogPath = child.getPath()
-
case codec if codec.startsWith(COMPRESSION_CODEC_PREFIX) =>
codecName = Some(codec.substring(COMPRESSION_CODEC_PREFIX.length()))
-
- case version if version.startsWith(SPARK_VERSION_PREFIX) =>
- sparkVersion = version.substring(SPARK_VERSION_PREFIX.length())
-
case _ =>
}
}
- if (eventLogPath == null || sparkVersion == null) {
+ if (eventLogPath == null) {
throw new IllegalArgumentException(s"$dir is not a Spark application log directory.")
}
@@ -388,7 +382,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
}
val in = new BufferedInputStream(fs.open(eventLogPath))
- (codec.map(_.compressedInputStream(in)).getOrElse(in), sparkVersion)
+ codec.map(_.compressedInputStream(in)).getOrElse(in)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 8cc6ec1e81..148485cc11 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -737,13 +737,13 @@ private[spark] class Master(
val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found"
try {
val eventLogFile = app.desc.eventLogDir
- .map { dir => EventLoggingListener.getLogPath(dir, app.id) }
+ .map { dir => EventLoggingListener.getLogPath(dir, app.id, app.desc.eventLogCodec) }
.getOrElse {
// Event logging is not enabled for this application
app.desc.appUiUrl = notFoundBasePath
return false
}
-
+
val fs = Utils.getHadoopFileSystem(eventLogFile, hadoopConf)
if (fs.exists(new Path(eventLogFile + EventLoggingListener.IN_PROGRESS))) {
@@ -756,12 +756,12 @@ private[spark] class Master(
return false
}
- val (logInput, sparkVersion) = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
+ val logInput = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
val replayBus = new ReplayListenerBus()
val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
appName + " (completed)", HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
try {
- replayBus.replay(logInput, sparkVersion, eventLogFile)
+ replayBus.replay(logInput, eventLogFile)
} finally {
logInput.close()
}
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index f856890d27..0709b6d689 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -26,7 +26,6 @@ import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}
import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils
-import org.apache.spark.Logging
/**
* :: DeveloperApi ::
@@ -53,8 +52,12 @@ private[spark] object CompressionCodec {
"lzf" -> classOf[LZFCompressionCodec].getName,
"snappy" -> classOf[SnappyCompressionCodec].getName)
+ def getCodecName(conf: SparkConf): String = {
+ conf.get(configKey, DEFAULT_COMPRESSION_CODEC)
+ }
+
def createCodec(conf: SparkConf): CompressionCodec = {
- createCodec(conf, conf.get(configKey, DEFAULT_COMPRESSION_CODEC))
+ createCodec(conf, getCodecName(conf))
}
def createCodec(conf: SparkConf, codecName: String): CompressionCodec = {
@@ -71,6 +74,20 @@ private[spark] object CompressionCodec {
s"Consider setting $configKey=$FALLBACK_COMPRESSION_CODEC"))
}
+ /**
+ * Return the short version of the given codec name.
+ * If it is already a short name, just return it.
+ */
+ def getShortName(codecName: String): String = {
+ if (shortCompressionCodecNames.contains(codecName)) {
+ codecName
+ } else {
+ shortCompressionCodecNames
+ .collectFirst { case (k, v) if v == codecName => k }
+ .getOrElse { throw new IllegalArgumentException(s"No short name for codec $codecName.") }
+ }
+ }
+
val FALLBACK_COMPRESSION_CODEC = "lzf"
val DEFAULT_COMPRESSION_CODEC = "snappy"
val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 30075c172b..2091a9fe8d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -62,6 +62,15 @@ private[spark] class EventLoggingListener(
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
private val fileSystem = Utils.getHadoopFileSystem(new URI(logBaseDir), hadoopConf)
+ private val compressionCodec =
+ if (shouldCompress) {
+ Some(CompressionCodec.createCodec(sparkConf))
+ } else {
+ None
+ }
+ private val compressionCodecName = compressionCodec.map { c =>
+ CompressionCodec.getShortName(c.getClass.getName)
+ }
// Only defined if the file system scheme is not local
private var hadoopDataStream: Option[FSDataOutputStream] = None
@@ -80,7 +89,7 @@ private[spark] class EventLoggingListener(
private[scheduler] val loggedEvents = new ArrayBuffer[JValue]
// Visible for tests only.
- private[scheduler] val logPath = getLogPath(logBaseDir, appId)
+ private[scheduler] val logPath = getLogPath(logBaseDir, appId, compressionCodecName)
/**
* Creates the log file in the configured log directory.
@@ -111,19 +120,19 @@ private[spark] class EventLoggingListener(
hadoopDataStream.get
}
- val compressionCodec =
- if (shouldCompress) {
- Some(CompressionCodec.createCodec(sparkConf))
- } else {
- None
- }
-
- fileSystem.setPermission(path, LOG_FILE_PERMISSIONS)
- val logStream = initEventLog(new BufferedOutputStream(dstream, outputBufferSize),
- compressionCodec)
- writer = Some(new PrintWriter(logStream))
+ try {
+ val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream)
+ val bstream = new BufferedOutputStream(cstream, outputBufferSize)
- logInfo("Logging events to %s".format(logPath))
+ EventLoggingListener.initEventLog(bstream)
+ fileSystem.setPermission(path, LOG_FILE_PERMISSIONS)
+ writer = Some(new PrintWriter(bstream))
+ logInfo("Logging events to %s".format(logPath))
+ } catch {
+ case e: Exception =>
+ dstream.close()
+ throw e
+ }
}
/** Log the event as JSON. */
@@ -201,77 +210,57 @@ private[spark] object EventLoggingListener extends Logging {
// Suffix applied to the names of files still being written by applications.
val IN_PROGRESS = ".inprogress"
val DEFAULT_LOG_DIR = "/tmp/spark-events"
+ val SPARK_VERSION_KEY = "SPARK_VERSION"
+ val COMPRESSION_CODEC_KEY = "COMPRESSION_CODEC"
private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort)
- // Marker for the end of header data in a log file. After this marker, log data, potentially
- // compressed, will be found.
- private val HEADER_END_MARKER = "=== LOG_HEADER_END ==="
-
- // To avoid corrupted files causing the heap to fill up. Value is arbitrary.
- private val MAX_HEADER_LINE_LENGTH = 4096
-
// A cache for compression codecs to avoid creating the same codec many times
private val codecMap = new mutable.HashMap[String, CompressionCodec]
/**
- * Write metadata about the event log to the given stream.
- *
- * The header is a serialized version of a map, except it does not use Java serialization to
- * avoid incompatibilities between different JDKs. It writes one map entry per line, in
- * "key=value" format.
+ * Write metadata about an event log to the given stream.
+ * The metadata is encoded in the first line of the event log as JSON.
*
- * The very last entry in the header is the `HEADER_END_MARKER` marker, so that the parsing code
- * can know when to stop.
- *
- * The format needs to be kept in sync with the openEventLog() method below. Also, it cannot
- * change in new Spark versions without some other way of detecting the change (like some
- * metadata encoded in the file name).
- *
- * @param logStream Raw output stream to the even log file.
- * @param compressionCodec Optional compression codec to use.
- * @return A stream where to write event log data. This may be a wrapper around the original
- * stream (for example, when compression is enabled).
+ * @param logStream Raw output stream to the event log file.
*/
- def initEventLog(
- logStream: OutputStream,
- compressionCodec: Option[CompressionCodec]): OutputStream = {
- val meta = mutable.HashMap(("version" -> SPARK_VERSION))
- compressionCodec.foreach { codec =>
- meta += ("compressionCodec" -> codec.getClass().getName())
- }
-
- def write(entry: String) = {
- val bytes = entry.getBytes(Charsets.UTF_8)
- if (bytes.length > MAX_HEADER_LINE_LENGTH) {
- throw new IOException(s"Header entry too long: ${entry}")
- }
- logStream.write(bytes, 0, bytes.length)
- }
-
- meta.foreach { case (k, v) => write(s"$k=$v\n") }
- write(s"$HEADER_END_MARKER\n")
- compressionCodec.map(_.compressedOutputStream(logStream)).getOrElse(logStream)
+ def initEventLog(logStream: OutputStream): Unit = {
+ val metadata = SparkListenerLogStart(SPARK_VERSION)
+ val metadataJson = compact(JsonProtocol.logStartToJson(metadata)) + "\n"
+ logStream.write(metadataJson.getBytes(Charsets.UTF_8))
}
/**
* Return a file-system-safe path to the log file for the given application.
*
+ * Note that because we currently only create a single log file for each application,
+ * we must encode all the information needed to parse this event log in the file name
+ * instead of within the file itself. Otherwise, if the file is compressed, for instance,
+ * we won't know which codec to use to decompress the metadata needed to open the file in
+ * the first place.
+ *
* @param logBaseDir Directory where the log file will be written.
* @param appId A unique app ID.
+ * @param compressionCodecName Name to identify the codec used to compress the contents
+ * of the log, or None if compression is not enabled.
* @return A path which consists of file-system-safe characters.
*/
- def getLogPath(logBaseDir: String, appId: String): String = {
- val name = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_").toLowerCase
- Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")
+ def getLogPath(
+ logBaseDir: String,
+ appId: String,
+ compressionCodecName: Option[String] = None): String = {
+ val sanitizedAppId = appId.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase
+ // e.g. app_123, app_123.lzf
+ val logName = sanitizedAppId + compressionCodecName.map { "." + _ }.getOrElse("")
+ Utils.resolveURI(logBaseDir).toString.stripSuffix("/") + "/" + logName
}
/**
- * Opens an event log file and returns an input stream to the event data.
+ * Opens an event log file and returns an input stream that contains the event data.
*
- * @return 2-tuple (event input stream, Spark version of event data)
+ * @return input stream that holds one JSON record per line.
*/
- def openEventLog(log: Path, fs: FileSystem): (InputStream, String) = {
+ def openEventLog(log: Path, fs: FileSystem): InputStream = {
// It's not clear whether FileSystem.open() throws FileNotFoundException or just plain
// IOException when a file does not exist, so try our best to throw a proper exception.
if (!fs.exists(log)) {
@@ -279,52 +268,17 @@ private[spark] object EventLoggingListener extends Logging {
}
val in = new BufferedInputStream(fs.open(log))
- // Read a single line from the input stream without buffering.
- // We cannot use BufferedReader because we must avoid reading
- // beyond the end of the header, after which the content of the
- // file may be compressed.
- def readLine(): String = {
- val bytes = new ByteArrayOutputStream()
- var next = in.read()
- var count = 0
- while (next != '\n') {
- if (next == -1) {
- throw new IOException("Unexpected end of file.")
- }
- bytes.write(next)
- count = count + 1
- if (count > MAX_HEADER_LINE_LENGTH) {
- throw new IOException("Maximum header line length exceeded.")
- }
- next = in.read()
- }
- new String(bytes.toByteArray(), Charsets.UTF_8)
+
+ // Compression codec is encoded as an extension, e.g. app_123.lzf
+ // Since we sanitize the app ID to not include periods, it is safe to split on it
+ val logName = log.getName.stripSuffix(IN_PROGRESS)
+ val codecName: Option[String] = logName.split("\\.").tail.lastOption
+ val codec = codecName.map { c =>
+ codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c))
}
- // Parse the header metadata in the form of k=v pairs
- // This assumes that every line before the header end marker follows this format
try {
- val meta = new mutable.HashMap[String, String]()
- var foundEndMarker = false
- while (!foundEndMarker) {
- readLine() match {
- case HEADER_END_MARKER =>
- foundEndMarker = true
- case entry =>
- val prop = entry.split("=", 2)
- if (prop.length != 2) {
- throw new IllegalArgumentException("Invalid metadata in log file.")
- }
- meta += (prop(0) -> prop(1))
- }
- }
-
- val sparkVersion = meta.get("version").getOrElse(
- throw new IllegalArgumentException("Missing Spark version in log metadata."))
- val codec = meta.get("compressionCodec").map { codecName =>
- codecMap.getOrElseUpdate(codecName, CompressionCodec.createCodec(new SparkConf, codecName))
- }
- (codec.map(_.compressedInputStream(in)).getOrElse(in), sparkVersion)
+ codec.map(_.compressedInputStream(in)).getOrElse(in)
} catch {
case e: Exception =>
in.close()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index d9c3a10dc5..95273c716b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -39,10 +39,9 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
* error is thrown by this method.
*
* @param logData Stream containing event log data.
- * @param version Spark version that generated the events.
* @param sourceName Filename (or other source identifier) from whence @logData is being read
*/
- def replay(logData: InputStream, version: String, sourceName: String) {
+ def replay(logData: InputStream, sourceName: String): Unit = {
var currentLine: String = null
var lineNumber: Int = 1
try {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index dd28ddb31d..52720d48ca 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -116,6 +116,11 @@ case class SparkListenerApplicationStart(appName: String, appId: Option[String],
@DeveloperApi
case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
+/**
+ * An internal class that describes the metadata of an event log.
+ * This event is not meant to be posted to listeners downstream.
+ */
+private[spark] case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent
/**
* :: DeveloperApi ::
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index fe8a19a2c0..61e69ecc08 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -58,6 +58,7 @@ private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkLi
listener.onExecutorAdded(executorAdded)
case executorRemoved: SparkListenerExecutorRemoved =>
listener.onExecutorRemoved(executorRemoved)
+ case logStart: SparkListenerLogStart => // ignore event log metadata
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index a0aa555f62..ffd4825705 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -85,7 +85,7 @@ private[spark] class SparkDeploySchedulerBackend(
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
- appUIAddress, sc.eventLogDir)
+ appUIAddress, sc.eventLogDir, sc.eventLogCodec)
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 8e20864db5..474f79fb75 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -89,6 +89,8 @@ private[spark] object JsonProtocol {
executorAddedToJson(executorAdded)
case executorRemoved: SparkListenerExecutorRemoved =>
executorRemovedToJson(executorRemoved)
+ case logStart: SparkListenerLogStart =>
+ logStartToJson(logStart)
// These aren't used, but keeps compiler happy
case SparkListenerExecutorMetricsUpdate(_, _) => JNothing
}
@@ -214,6 +216,11 @@ private[spark] object JsonProtocol {
("Removed Reason" -> executorRemoved.reason)
}
+ def logStartToJson(logStart: SparkListenerLogStart): JValue = {
+ ("Event" -> Utils.getFormattedClassName(logStart)) ~
+ ("Spark Version" -> SPARK_VERSION)
+ }
+
/** ------------------------------------------------------------------- *
* JSON serialization methods for classes SparkListenerEvents depend on |
* -------------------------------------------------------------------- */
@@ -447,6 +454,7 @@ private[spark] object JsonProtocol {
val applicationEnd = Utils.getFormattedClassName(SparkListenerApplicationEnd)
val executorAdded = Utils.getFormattedClassName(SparkListenerExecutorAdded)
val executorRemoved = Utils.getFormattedClassName(SparkListenerExecutorRemoved)
+ val logStart = Utils.getFormattedClassName(SparkListenerLogStart)
(json \ "Event").extract[String] match {
case `stageSubmitted` => stageSubmittedFromJson(json)
@@ -464,6 +472,7 @@ private[spark] object JsonProtocol {
case `applicationEnd` => applicationEndFromJson(json)
case `executorAdded` => executorAddedFromJson(json)
case `executorRemoved` => executorRemovedFromJson(json)
+ case `logStart` => logStartFromJson(json)
}
}
@@ -574,6 +583,11 @@ private[spark] object JsonProtocol {
SparkListenerExecutorRemoved(time, executorId, reason)
}
+ def logStartFromJson(json: JValue): SparkListenerLogStart = {
+ val sparkVersion = (json \ "Spark Version").extract[String]
+ SparkListenerLogStart(sparkVersion)
+ }
+
/** --------------------------------------------------------------------- *
* JSON deserialization methods for classes SparkListenerEvents depend on |
* ---------------------------------------------------------------------- */
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 85939eaadc..e908ba604e 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -17,18 +17,17 @@
package org.apache.spark.deploy.history
-import java.io.{File, FileOutputStream, OutputStreamWriter}
+import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStreamWriter}
+import java.net.URI
import scala.io.Source
-import com.google.common.io.Files
import org.apache.hadoop.fs.Path
import org.json4s.jackson.JsonMethods._
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.Matchers
import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io._
import org.apache.spark.scheduler._
import org.apache.spark.util.{JsonProtocol, Utils}
@@ -45,18 +44,35 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
Utils.deleteRecursively(testDir)
}
+ /** Create a fake log file using the new log format used in Spark 1.3+ */
+ private def newLogFile(
+ appId: String,
+ inProgress: Boolean,
+ codec: Option[String] = None): File = {
+ val ip = if (inProgress) EventLoggingListener.IN_PROGRESS else ""
+ val logUri = EventLoggingListener.getLogPath(testDir.getAbsolutePath, appId)
+ val logPath = new URI(logUri).getPath + ip
+ new File(logPath)
+ }
+
test("Parse new and old application logs") {
val provider = new FsHistoryProvider(createTestConf())
// Write a new-style application log.
- val newAppComplete = new File(testDir, "new1")
+ val newAppComplete = newLogFile("new1", inProgress = false)
writeFile(newAppComplete, true, None,
SparkListenerApplicationStart("new-app-complete", None, 1L, "test"),
- SparkListenerApplicationEnd(4L)
+ SparkListenerApplicationEnd(5L)
)
+ // Write a new-style application log.
+ val newAppCompressedComplete = newLogFile("new1compressed", inProgress = false, Some("lzf"))
+ writeFile(newAppCompressedComplete, true, None,
+ SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test"),
+ SparkListenerApplicationEnd(4L))
+
// Write an unfinished app, new-style.
- val newAppIncomplete = new File(testDir, "new2" + EventLoggingListener.IN_PROGRESS)
+ val newAppIncomplete = newLogFile("new2", inProgress = true)
writeFile(newAppIncomplete, true, None,
SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test")
)
@@ -89,16 +105,18 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
val list = provider.getListing().toSeq
list should not be (null)
- list.size should be (4)
- list.count(e => e.completed) should be (2)
+ list.size should be (5)
+ list.count(_.completed) should be (3)
- list(0) should be (ApplicationHistoryInfo(newAppComplete.getName(), "new-app-complete", 1L, 4L,
+ list(0) should be (ApplicationHistoryInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L,
newAppComplete.lastModified(), "test", true))
- list(1) should be (ApplicationHistoryInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
+ list(1) should be (ApplicationHistoryInfo(newAppCompressedComplete.getName(),
+ "new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test", true))
+ list(2) should be (ApplicationHistoryInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
oldAppComplete.lastModified(), "test", true))
- list(2) should be (ApplicationHistoryInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L,
+ list(3) should be (ApplicationHistoryInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L,
-1L, oldAppIncomplete.lastModified(), "test", false))
- list(3) should be (ApplicationHistoryInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L,
+ list(4) should be (ApplicationHistoryInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L,
-1L, newAppIncomplete.lastModified(), "test", false))
// Make sure the UI can be rendered.
@@ -127,7 +145,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
val logPath = new Path(logDir.getAbsolutePath())
try {
- val (logInput, sparkVersion) = provider.openLegacyEventLog(logPath)
+ val logInput = provider.openLegacyEventLog(logPath)
try {
Source.fromInputStream(logInput).getLines().toSeq.size should be (2)
} finally {
@@ -141,12 +159,12 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
}
test("SPARK-3697: ignore directories that cannot be read.") {
- val logFile1 = new File(testDir, "new1")
+ val logFile1 = newLogFile("new1", inProgress = false)
writeFile(logFile1, true, None,
SparkListenerApplicationStart("app1-1", None, 1L, "test"),
SparkListenerApplicationEnd(2L)
)
- val logFile2 = new File(testDir, "new2")
+ val logFile2 = newLogFile("new2", inProgress = false)
writeFile(logFile2, true, None,
SparkListenerApplicationStart("app1-2", None, 1L, "test"),
SparkListenerApplicationEnd(2L)
@@ -164,7 +182,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
test("history file is renamed from inprogress to completed") {
val provider = new FsHistoryProvider(createTestConf())
- val logFile1 = new File(testDir, "app1" + EventLoggingListener.IN_PROGRESS)
+ val logFile1 = newLogFile("app1", inProgress = true)
writeFile(logFile1, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test"),
SparkListenerApplicationEnd(2L)
@@ -174,7 +192,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
appListBeforeRename.size should be (1)
appListBeforeRename.head.logPath should endWith(EventLoggingListener.IN_PROGRESS)
- logFile1.renameTo(new File(testDir, "app1"))
+ logFile1.renameTo(newLogFile("app1", inProgress = false))
provider.checkForLogs()
val appListAfterRename = provider.getListing()
appListAfterRename.size should be (1)
@@ -184,7 +202,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
test("SPARK-5582: empty log directory") {
val provider = new FsHistoryProvider(createTestConf())
- val logFile1 = new File(testDir, "app1" + EventLoggingListener.IN_PROGRESS)
+ val logFile1 = newLogFile("app1", inProgress = true)
writeFile(logFile1, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test"),
SparkListenerApplicationEnd(2L))
@@ -199,14 +217,13 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
private def writeFile(file: File, isNewFormat: Boolean, codec: Option[CompressionCodec],
events: SparkListenerEvent*) = {
- val out =
- if (isNewFormat) {
- EventLoggingListener.initEventLog(new FileOutputStream(file), codec)
- } else {
- val fileStream = new FileOutputStream(file)
- codec.map(_.compressedOutputStream(fileStream)).getOrElse(fileStream)
- }
- val writer = new OutputStreamWriter(out, "UTF-8")
+ val fstream = new FileOutputStream(file)
+ val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream)
+ val bstream = new BufferedOutputStream(cstream)
+ if (isNewFormat) {
+ EventLoggingListener.initEventLog(new FileOutputStream(file))
+ }
+ val writer = new OutputStreamWriter(bstream, "UTF-8")
try {
events.foreach(e => writer.write(compact(render(JsonProtocol.sparkEventToJson(e))) + "\n"))
} finally {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 437d8693c0..992dde66f9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.scheduler
import java.io.{File, FileOutputStream, InputStream, IOException}
+import java.net.URI
import scala.collection.mutable
import scala.io.Source
@@ -26,7 +27,7 @@ import org.apache.hadoop.fs.Path
import org.json4s.jackson.JsonMethods._
import org.scalatest.{BeforeAndAfter, FunSuite}
-import org.apache.spark.{Logging, SparkConf, SparkContext}
+import org.apache.spark.{Logging, SparkConf, SparkContext, SPARK_VERSION}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io._
import org.apache.spark.util.{JsonProtocol, Utils}
@@ -78,7 +79,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
test("Basic event logging with compression") {
CompressionCodec.ALL_COMPRESSION_CODECS.foreach { codec =>
- testEventLogging(compressionCodec = Some(codec))
+ testEventLogging(compressionCodec = Some(CompressionCodec.getShortName(codec)))
}
}
@@ -88,25 +89,35 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
test("End-to-end event logging with compression") {
CompressionCodec.ALL_COMPRESSION_CODECS.foreach { codec =>
- testApplicationEventLogging(compressionCodec = Some(codec))
+ testApplicationEventLogging(compressionCodec = Some(CompressionCodec.getShortName(codec)))
}
}
test("Log overwriting") {
- val log = new FileOutputStream(new File(testDir, "test"))
- log.close()
- try {
- testEventLogging()
- assert(false)
- } catch {
- case e: IOException =>
- // Expected, since we haven't enabled log overwrite.
- }
-
+ val logUri = EventLoggingListener.getLogPath(testDir.getAbsolutePath, "test")
+ val logPath = new URI(logUri).getPath
+ // Create file before writing the event log
+ new FileOutputStream(new File(logPath)).close()
+ // Expected IOException, since we haven't enabled log overwrite.
+ intercept[IOException] { testEventLogging() }
// Try again, but enable overwriting.
testEventLogging(extraConf = Map("spark.eventLog.overwrite" -> "true"))
}
+ test("Event log name") {
+ // without compression
+ assert(s"file:/base-dir/app1" === EventLoggingListener.getLogPath("/base-dir", "app1"))
+ // with compression
+ assert(s"file:/base-dir/app1.lzf" ===
+ EventLoggingListener.getLogPath("/base-dir", "app1", Some("lzf")))
+ // illegal characters in app ID
+ assert(s"file:/base-dir/a-fine-mind_dollar_bills__1" ===
+ EventLoggingListener.getLogPath("/base-dir", "a fine:mind$dollar{bills}.1"))
+ // illegal characters in app ID with compression
+ assert(s"file:/base-dir/a-fine-mind_dollar_bills__1.lz4" ===
+ EventLoggingListener.getLogPath("/base-dir", "a fine:mind$dollar{bills}.1", Some("lz4")))
+ }
+
/* ----------------- *
* Actual test logic *
* ----------------- */
@@ -140,15 +151,17 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
eventLogger.stop()
// Verify file contains exactly the two events logged
- val (logData, version) = EventLoggingListener.openEventLog(new Path(eventLogger.logPath),
- fileSystem)
+ val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem)
try {
val lines = readLines(logData)
- assert(lines.size === 2)
- assert(lines(0).contains("SparkListenerApplicationStart"))
- assert(lines(1).contains("SparkListenerApplicationEnd"))
- assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === applicationStart)
- assert(JsonProtocol.sparkEventFromJson(parse(lines(1))) === applicationEnd)
+ val logStart = SparkListenerLogStart(SPARK_VERSION)
+ assert(lines.size === 3)
+ assert(lines(0).contains("SparkListenerLogStart"))
+ assert(lines(1).contains("SparkListenerApplicationStart"))
+ assert(lines(2).contains("SparkListenerApplicationEnd"))
+ assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
+ assert(JsonProtocol.sparkEventFromJson(parse(lines(1))) === applicationStart)
+ assert(JsonProtocol.sparkEventFromJson(parse(lines(2))) === applicationEnd)
} finally {
logData.close()
}
@@ -163,8 +176,10 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
val sc = new SparkContext("local-cluster[2,2,512]", "test", conf)
assert(sc.eventLogger.isDefined)
val eventLogger = sc.eventLogger.get
+ val eventLogPath = eventLogger.logPath
val expectedLogDir = testDir.toURI().toString()
- assert(eventLogger.logPath.startsWith(expectedLogDir + "/"))
+ assert(eventLogPath === EventLoggingListener.getLogPath(
+ expectedLogDir, sc.applicationId, compressionCodec.map(CompressionCodec.getShortName)))
// Begin listening for events that trigger asserts
val eventExistenceListener = new EventExistenceListener(eventLogger)
@@ -178,8 +193,8 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
eventExistenceListener.assertAllCallbacksInvoked()
// Make sure expected events exist in the log file.
- val (logData, version) = EventLoggingListener.openEventLog(new Path(eventLogger.logPath),
- fileSystem)
+ val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem)
+ val logStart = SparkListenerLogStart(SPARK_VERSION)
val lines = readLines(logData)
val eventSet = mutable.Set(
SparkListenerApplicationStart,
@@ -204,6 +219,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
}
}
}
+ assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index 702c4cb3bd..601694f57a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -61,7 +61,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
try {
val replayer = new ReplayListenerBus()
replayer.addListener(eventMonster)
- replayer.replay(logData, SPARK_VERSION, logFilePath.toString)
+ replayer.replay(logData, logFilePath.toString)
} finally {
logData.close()
}
@@ -115,12 +115,12 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
assert(!eventLog.isDir)
// Replay events
- val (logData, version) = EventLoggingListener.openEventLog(eventLog.getPath(), fileSystem)
+ val logData = EventLoggingListener.openEventLog(eventLog.getPath(), fileSystem)
val eventMonster = new EventMonster(conf)
try {
val replayer = new ReplayListenerBus()
replayer.addListener(eventMonster)
- replayer.replay(logData, version, eventLog.getPath().toString)
+ replayer.replay(logData, eventLog.getPath().toString)
} finally {
logData.close()
}
@@ -150,11 +150,4 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
override def start() { }
}
-
- private def getCompressionCodec(codecName: String) = {
- val conf = new SparkConf
- conf.set("spark.io.compression.codec", codecName)
- CompressionCodec.createCodec(conf)
- }
-
}