From 14632b7942c02a332c4d3814fb6b2611e3f76fc7 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 3 Apr 2015 11:54:31 -0700 Subject: [SPARK-6688] [core] Always use resolved URIs in EventLoggingListener. Author: Marcelo Vanzin Closes #5340 from vanzin/SPARK-6688 and squashes the following commits: ccfddd9 [Marcelo Vanzin] Resolve at the source. 20d2a34 [Marcelo Vanzin] [SPARK-6688] [core] Always use resolved URIs in EventLoggingListener. --- .../main/scala/org/apache/spark/SparkContext.scala | 6 ++++-- .../spark/deploy/ApplicationDescription.scala | 6 ++++-- .../spark/scheduler/EventLoggingListener.scala | 10 +++++----- .../deploy/history/FsHistoryProviderSuite.scala | 2 +- .../scheduler/EventLoggingListenerSuite.scala | 22 ++++++++++++++-------- .../spark/scheduler/ReplayListenerSuite.scala | 3 ++- 6 files changed, 30 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index fd1838976e..3b73a8a8fd 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -227,9 +227,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli val appName = conf.get("spark.app.name") private[spark] val isEventLogEnabled = conf.getBoolean("spark.eventLog.enabled", false) - private[spark] val eventLogDir: Option[String] = { + private[spark] val eventLogDir: Option[URI] = { if (isEventLogEnabled) { - Some(conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR).stripSuffix("/")) + val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR) + .stripSuffix("/") + Some(Utils.resolveURI(unresolvedDir)) } else { None } diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index 3d0d68de8f..b7ae9c1fc0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -17,13 +17,15 @@ package org.apache.spark.deploy +import java.net.URI + private[spark] class ApplicationDescription( val name: String, val maxCores: Option[Int], val memoryPerSlave: Int, val command: Command, var appUiUrl: String, - val eventLogDir: Option[String] = None, + val eventLogDir: Option[URI] = None, // short name of compression codec used when writing event logs, if any (e.g. lzf) val eventLogCodec: Option[String] = None) extends Serializable { @@ -36,7 +38,7 @@ private[spark] class ApplicationDescription( memoryPerSlave: Int = memoryPerSlave, command: Command = command, appUiUrl: String = appUiUrl, - eventLogDir: Option[String] = eventLogDir, + eventLogDir: Option[URI] = eventLogDir, eventLogCodec: Option[String] = eventLogCodec): ApplicationDescription = new ApplicationDescription( name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir, eventLogCodec) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index c0d889360a..08e7727db2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -47,21 +47,21 @@ import org.apache.spark.util.{JsonProtocol, Utils} */ private[spark] class EventLoggingListener( appId: String, - logBaseDir: String, + logBaseDir: URI, sparkConf: SparkConf, hadoopConf: Configuration) extends SparkListener with Logging { import EventLoggingListener._ - def this(appId: String, logBaseDir: String, sparkConf: SparkConf) = + def this(appId: String, logBaseDir: URI, sparkConf: SparkConf) = this(appId, logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf)) private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false) private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false) private val testing = sparkConf.getBoolean("spark.eventLog.testing", false) private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024 - private val fileSystem = Utils.getHadoopFileSystem(new URI(logBaseDir), hadoopConf) + private val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) private val compressionCodec = if (shouldCompress) { Some(CompressionCodec.createCodec(sparkConf)) @@ -259,13 +259,13 @@ private[spark] object EventLoggingListener extends Logging { * @return A path which consists of file-system-safe characters. */ def getLogPath( - logBaseDir: String, + logBaseDir: URI, appId: String, compressionCodecName: Option[String] = None): String = { val sanitizedAppId = appId.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase // e.g. app_123, app_123.lzf val logName = sanitizedAppId + compressionCodecName.map { "." + _ }.getOrElse("") - Utils.resolveURI(logBaseDir).toString.stripSuffix("/") + "/" + logName + logBaseDir.toString.stripSuffix("/") + "/" + logName } /** diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index e908ba604e..fcae603c7d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -50,7 +50,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers inProgress: Boolean, codec: Option[String] = None): File = { val ip = if (inProgress) EventLoggingListener.IN_PROGRESS else "" - val logUri = EventLoggingListener.getLogPath(testDir.getAbsolutePath, appId) + val logUri = EventLoggingListener.getLogPath(testDir.toURI, appId) val logPath = new URI(logUri).getPath + ip new File(logPath) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 448258a754..30ee63e78d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -61,7 +61,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef test("Verify log file exist") { // Verify logging directory exists val conf = getLoggingConf(testDirPath) - val eventLogger = new EventLoggingListener("test", testDirPath.toUri().toString(), conf) + val eventLogger = new EventLoggingListener("test", testDirPath.toUri(), conf) eventLogger.start() val logPath = new Path(eventLogger.logPath + EventLoggingListener.IN_PROGRESS) @@ -95,7 +95,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef } test("Log overwriting") { - val logUri = EventLoggingListener.getLogPath(testDir.getAbsolutePath, "test") + val logUri = EventLoggingListener.getLogPath(testDir.toURI, "test") val logPath = new URI(logUri).getPath // Create file before writing the event log new FileOutputStream(new File(logPath)).close() @@ -107,16 +107,19 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef test("Event log name") { // without compression - assert(s"file:/base-dir/app1" === EventLoggingListener.getLogPath("/base-dir", "app1")) + assert(s"file:/base-dir/app1" === EventLoggingListener.getLogPath( + Utils.resolveURI("/base-dir"), "app1")) // with compression assert(s"file:/base-dir/app1.lzf" === - EventLoggingListener.getLogPath("/base-dir", "app1", Some("lzf"))) + EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), "app1", Some("lzf"))) // illegal characters in app ID assert(s"file:/base-dir/a-fine-mind_dollar_bills__1" === - EventLoggingListener.getLogPath("/base-dir", "a fine:mind$dollar{bills}.1")) + EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), + "a fine:mind$dollar{bills}.1")) // illegal characters in app ID with compression assert(s"file:/base-dir/a-fine-mind_dollar_bills__1.lz4" === - EventLoggingListener.getLogPath("/base-dir", "a fine:mind$dollar{bills}.1", Some("lz4"))) + EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), + "a fine:mind$dollar{bills}.1", Some("lz4"))) } /* ----------------- * @@ -137,7 +140,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef val conf = getLoggingConf(testDirPath, compressionCodec) extraConf.foreach { case (k, v) => conf.set(k, v) } val logName = compressionCodec.map("test-" + _).getOrElse("test") - val eventLogger = new EventLoggingListener(logName, testDirPath.toUri().toString(), conf) + val eventLogger = new EventLoggingListener(logName, testDirPath.toUri(), conf) val listenerBus = new LiveListenerBus val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None, 125L, "Mickey") @@ -173,12 +176,15 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef * This runs a simple Spark job and asserts that the expected events are logged when expected. */ private def testApplicationEventLogging(compressionCodec: Option[String] = None) { + // Set defaultFS to something that would cause an exception, to make sure we don't run + // into SPARK-6688. val conf = getLoggingConf(testDirPath, compressionCodec) + .set("spark.hadoop.fs.defaultFS", "unsupported://example.com") val sc = new SparkContext("local-cluster[2,2,512]", "test", conf) assert(sc.eventLogger.isDefined) val eventLogger = sc.eventLogger.get val eventLogPath = eventLogger.logPath - val expectedLogDir = testDir.toURI().toString() + val expectedLogDir = testDir.toURI() assert(eventLogPath === EventLoggingListener.getLogPath( expectedLogDir, sc.applicationId, compressionCodec.map(CompressionCodec.getShortName))) diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index 601694f57a..6de6d2fec6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler import java.io.{File, PrintWriter} +import java.net.URI import org.json4s.jackson.JsonMethods._ import org.scalatest.{BeforeAndAfter, FunSuite} @@ -145,7 +146,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { * log the events. */ private class EventMonster(conf: SparkConf) - extends EventLoggingListener("test", "testdir", conf) { + extends EventLoggingListener("test", new URI("testdir"), conf) { override def start() { } -- cgit v1.2.3