aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala28
1 files changed, 21 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 08e7727db2..529a5b2bf1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -47,6 +47,7 @@ import org.apache.spark.util.{JsonProtocol, Utils}
*/
private[spark] class EventLoggingListener(
appId: String,
+ appAttemptId : Option[String],
logBaseDir: URI,
sparkConf: SparkConf,
hadoopConf: Configuration)
@@ -54,8 +55,9 @@ private[spark] class EventLoggingListener(
import EventLoggingListener._
- def this(appId: String, logBaseDir: URI, sparkConf: SparkConf) =
- this(appId, logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))
+ def this(appId: String, appAttemptId : Option[String], logBaseDir: URI, sparkConf: SparkConf) =
+ this(appId, appAttemptId, logBaseDir, sparkConf,
+ SparkHadoopUtil.get.newConfiguration(sparkConf))
private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
@@ -89,7 +91,7 @@ private[spark] class EventLoggingListener(
private[scheduler] val loggedEvents = new ArrayBuffer[JValue]
// Visible for tests only.
- private[scheduler] val logPath = getLogPath(logBaseDir, appId, compressionCodecName)
+ private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName)
/**
* Creates the log file in the configured log directory.
@@ -252,8 +254,12 @@ private[spark] object EventLoggingListener extends Logging {
* we won't know which codec to use to decompress the metadata needed to open the file in
* the first place.
*
+ * The log file name will identify the compression codec used for the contents, if any.
+ * For example, app_123 for an uncompressed log, app_123.lzf for an LZF-compressed log.
+ *
* @param logBaseDir Directory where the log file will be written.
* @param appId A unique app ID.
+ * @param appAttemptId A unique attempt id of appId. May be the empty string.
* @param compressionCodecName Name to identify the codec used to compress the contents
* of the log, or None if compression is not enabled.
* @return A path which consists of file-system-safe characters.
@@ -261,11 +267,19 @@ private[spark] object EventLoggingListener extends Logging {
def getLogPath(
logBaseDir: URI,
appId: String,
+ appAttemptId: Option[String],
compressionCodecName: Option[String] = None): String = {
- val sanitizedAppId = appId.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase
- // e.g. app_123, app_123.lzf
- val logName = sanitizedAppId + compressionCodecName.map { "." + _ }.getOrElse("")
- logBaseDir.toString.stripSuffix("/") + "/" + logName
+ val base = logBaseDir.toString.stripSuffix("/") + "/" + sanitize(appId)
+ val codec = compressionCodecName.map("." + _).getOrElse("")
+ if (appAttemptId.isDefined) {
+ base + "_" + sanitize(appAttemptId.get) + codec
+ } else {
+ base + codec
+ }
+ }
+
+ private def sanitize(str: String): String = {
+ str.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase
}
/**