aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-04-03 11:54:31 -0700
committerAndrew Or <andrew@databricks.com>2015-04-03 11:55:11 -0700
commitf17a2fe9b32c3bd6346718089d9db4da597df402 (patch)
treebc148ffc9c463cbfae87aca8cbbeb0fd8a40b9c0
parent0c1b78b72bfb5ef806a6c5258a4e5b021b8d1912 (diff)
downloadspark-f17a2fe9b32c3bd6346718089d9db4da597df402.tar.gz
spark-f17a2fe9b32c3bd6346718089d9db4da597df402.tar.bz2
spark-f17a2fe9b32c3bd6346718089d9db4da597df402.zip
[SPARK-6688] [core] Always use resolved URIs in EventLoggingListener.
Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #5340 from vanzin/SPARK-6688 and squashes the following commits: ccfddd9 [Marcelo Vanzin] Resolve at the source. 20d2a34 [Marcelo Vanzin] [SPARK-6688] [core] Always use resolved URIs in EventLoggingListener. (cherry picked from commit 14632b7942c02a332c4d3814fb6b2611e3f76fc7) Signed-off-by: Andrew Or <andrew@databricks.com>
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala10
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala22
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala3
6 files changed, 30 insertions, 19 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 4aee423043..495227b201 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -227,9 +227,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
val appName = conf.get("spark.app.name")
private[spark] val isEventLogEnabled = conf.getBoolean("spark.eventLog.enabled", false)
- private[spark] val eventLogDir: Option[String] = {
+ private[spark] val eventLogDir: Option[URI] = {
if (isEventLogEnabled) {
- Some(conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR).stripSuffix("/"))
+ val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
+ .stripSuffix("/")
+ Some(Utils.resolveURI(unresolvedDir))
} else {
None
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index 3d0d68de8f..b7ae9c1fc0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -17,13 +17,15 @@
package org.apache.spark.deploy
+import java.net.URI
+
private[spark] class ApplicationDescription(
val name: String,
val maxCores: Option[Int],
val memoryPerSlave: Int,
val command: Command,
var appUiUrl: String,
- val eventLogDir: Option[String] = None,
+ val eventLogDir: Option[URI] = None,
// short name of compression codec used when writing event logs, if any (e.g. lzf)
val eventLogCodec: Option[String] = None)
extends Serializable {
@@ -36,7 +38,7 @@ private[spark] class ApplicationDescription(
memoryPerSlave: Int = memoryPerSlave,
command: Command = command,
appUiUrl: String = appUiUrl,
- eventLogDir: Option[String] = eventLogDir,
+ eventLogDir: Option[URI] = eventLogDir,
eventLogCodec: Option[String] = eventLogCodec): ApplicationDescription =
new ApplicationDescription(
name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir, eventLogCodec)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 2091a9fe8d..d99db9e26c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -47,21 +47,21 @@ import org.apache.spark.util.{JsonProtocol, Utils}
*/
private[spark] class EventLoggingListener(
appId: String,
- logBaseDir: String,
+ logBaseDir: URI,
sparkConf: SparkConf,
hadoopConf: Configuration)
extends SparkListener with Logging {
import EventLoggingListener._
- def this(appId: String, logBaseDir: String, sparkConf: SparkConf) =
+ def this(appId: String, logBaseDir: URI, sparkConf: SparkConf) =
this(appId, logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))
private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
- private val fileSystem = Utils.getHadoopFileSystem(new URI(logBaseDir), hadoopConf)
+ private val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
private val compressionCodec =
if (shouldCompress) {
Some(CompressionCodec.createCodec(sparkConf))
@@ -246,13 +246,13 @@ private[spark] object EventLoggingListener extends Logging {
* @return A path which consists of file-system-safe characters.
*/
def getLogPath(
- logBaseDir: String,
+ logBaseDir: URI,
appId: String,
compressionCodecName: Option[String] = None): String = {
val sanitizedAppId = appId.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase
// e.g. app_123, app_123.lzf
val logName = sanitizedAppId + compressionCodecName.map { "." + _ }.getOrElse("")
- Utils.resolveURI(logBaseDir).toString.stripSuffix("/") + "/" + logName
+ logBaseDir.toString.stripSuffix("/") + "/" + logName
}
/**
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index e908ba604e..fcae603c7d 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -50,7 +50,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
inProgress: Boolean,
codec: Option[String] = None): File = {
val ip = if (inProgress) EventLoggingListener.IN_PROGRESS else ""
- val logUri = EventLoggingListener.getLogPath(testDir.getAbsolutePath, appId)
+ val logUri = EventLoggingListener.getLogPath(testDir.toURI, appId)
val logPath = new URI(logUri).getPath + ip
new File(logPath)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 992dde66f9..7e401664a6 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -60,7 +60,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
test("Verify log file exist") {
// Verify logging directory exists
val conf = getLoggingConf(testDirPath)
- val eventLogger = new EventLoggingListener("test", testDirPath.toUri().toString(), conf)
+ val eventLogger = new EventLoggingListener("test", testDirPath.toUri(), conf)
eventLogger.start()
val logPath = new Path(eventLogger.logPath + EventLoggingListener.IN_PROGRESS)
@@ -94,7 +94,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
}
test("Log overwriting") {
- val logUri = EventLoggingListener.getLogPath(testDir.getAbsolutePath, "test")
+ val logUri = EventLoggingListener.getLogPath(testDir.toURI, "test")
val logPath = new URI(logUri).getPath
// Create file before writing the event log
new FileOutputStream(new File(logPath)).close()
@@ -106,16 +106,19 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
test("Event log name") {
// without compression
- assert(s"file:/base-dir/app1" === EventLoggingListener.getLogPath("/base-dir", "app1"))
+ assert(s"file:/base-dir/app1" === EventLoggingListener.getLogPath(
+ Utils.resolveURI("/base-dir"), "app1"))
// with compression
assert(s"file:/base-dir/app1.lzf" ===
- EventLoggingListener.getLogPath("/base-dir", "app1", Some("lzf")))
+ EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), "app1", Some("lzf")))
// illegal characters in app ID
assert(s"file:/base-dir/a-fine-mind_dollar_bills__1" ===
- EventLoggingListener.getLogPath("/base-dir", "a fine:mind$dollar{bills}.1"))
+ EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"),
+ "a fine:mind$dollar{bills}.1"))
// illegal characters in app ID with compression
assert(s"file:/base-dir/a-fine-mind_dollar_bills__1.lz4" ===
- EventLoggingListener.getLogPath("/base-dir", "a fine:mind$dollar{bills}.1", Some("lz4")))
+ EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"),
+ "a fine:mind$dollar{bills}.1", Some("lz4")))
}
/* ----------------- *
@@ -136,7 +139,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
val conf = getLoggingConf(testDirPath, compressionCodec)
extraConf.foreach { case (k, v) => conf.set(k, v) }
val logName = compressionCodec.map("test-" + _).getOrElse("test")
- val eventLogger = new EventLoggingListener(logName, testDirPath.toUri().toString(), conf)
+ val eventLogger = new EventLoggingListener(logName, testDirPath.toUri(), conf)
val listenerBus = new LiveListenerBus
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
125L, "Mickey")
@@ -172,12 +175,15 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
* This runs a simple Spark job and asserts that the expected events are logged when expected.
*/
private def testApplicationEventLogging(compressionCodec: Option[String] = None) {
+ // Set defaultFS to something that would cause an exception, to make sure we don't run
+ // into SPARK-6688.
val conf = getLoggingConf(testDirPath, compressionCodec)
+ .set("spark.hadoop.fs.defaultFS", "unsupported://example.com")
val sc = new SparkContext("local-cluster[2,2,512]", "test", conf)
assert(sc.eventLogger.isDefined)
val eventLogger = sc.eventLogger.get
val eventLogPath = eventLogger.logPath
- val expectedLogDir = testDir.toURI().toString()
+ val expectedLogDir = testDir.toURI()
assert(eventLogPath === EventLoggingListener.getLogPath(
expectedLogDir, sc.applicationId, compressionCodec.map(CompressionCodec.getShortName)))
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index 601694f57a..6de6d2fec6 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.scheduler
import java.io.{File, PrintWriter}
+import java.net.URI
import org.json4s.jackson.JsonMethods._
import org.scalatest.{BeforeAndAfter, FunSuite}
@@ -145,7 +146,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
* log the events.
*/
private class EventMonster(conf: SparkConf)
- extends EventLoggingListener("test", "testdir", conf) {
+ extends EventLoggingListener("test", new URI("testdir"), conf) {
override def start() { }