aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2014-04-23 14:47:38 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-23 14:47:38 -0700
commitdd1b7a61d9193c93ab95ab550622259f4bc26f53 (patch)
tree1f1ebc4f8d59a4413a70f2f63f43cf05af03a79c /core
parenta967b005c8937a3053e215c952d2172ee3dc300d (diff)
downloadspark-dd1b7a61d9193c93ab95ab550622259f4bc26f53.tar.gz
spark-dd1b7a61d9193c93ab95ab550622259f4bc26f53.tar.bz2
spark-dd1b7a61d9193c93ab95ab550622259f4bc26f53.zip
Honor default fs name when initializing event logger.
This is related to SPARK-1459 / PR #375. Without this fix, FileLogger.createLogDir() may try to create the log dir on HDFS, while createWriter() will try to open the log file on the local file system, leading to interesting errors and confusion. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #450 from vanzin/event-file-2 and squashes the following commits: 592cdb3 [Marcelo Vanzin] Honor default fs name when initializing event logger.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala48
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/util/FileLogger.scala17
3 files changed, 41 insertions, 33 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 25ca650a3a..c14dce8273 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -216,10 +216,33 @@ class SparkContext(config: SparkConf) extends Logging {
private[spark] val ui = new SparkUI(this)
ui.bind()
+ /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
+ val hadoopConfiguration: Configuration = {
+ val env = SparkEnv.get
+ val hadoopConf = SparkHadoopUtil.get.newConfiguration()
+ // Explicitly check for S3 environment variables
+ if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
+ System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
+ hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
+ hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
+ hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
+ hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
+ }
+ // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
+ conf.getAll.foreach { case (key, value) =>
+ if (key.startsWith("spark.hadoop.")) {
+ hadoopConf.set(key.substring("spark.hadoop.".length), value)
+ }
+ }
+ val bufferSize = conf.get("spark.buffer.size", "65536")
+ hadoopConf.set("io.file.buffer.size", bufferSize)
+ hadoopConf
+ }
+
// Optionally log Spark events
private[spark] val eventLogger: Option[EventLoggingListener] = {
if (conf.getBoolean("spark.eventLog.enabled", false)) {
- val logger = new EventLoggingListener(appName, conf)
+ val logger = new EventLoggingListener(appName, conf, hadoopConfiguration)
logger.start()
listenerBus.addListener(logger)
Some(logger)
@@ -294,29 +317,6 @@ class SparkContext(config: SparkConf) extends Logging {
postEnvironmentUpdate()
postApplicationStart()
- /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
- val hadoopConfiguration: Configuration = {
- val env = SparkEnv.get
- val hadoopConf = SparkHadoopUtil.get.newConfiguration()
- // Explicitly check for S3 environment variables
- if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
- System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
- hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
- hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
- hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
- hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
- }
- // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
- conf.getAll.foreach { case (key, value) =>
- if (key.startsWith("spark.hadoop.")) {
- hadoopConf.set(key.substring("spark.hadoop.".length), value)
- }
- }
- val bufferSize = conf.get("spark.buffer.size", "65536")
- hadoopConf.set("io.file.buffer.size", bufferSize)
- hadoopConf
- }
-
private[spark] var checkpointDir: Option[String] = None
// Thread Local variable that can be used by users to pass information down the stack
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index b983c16af1..2fe65cd944 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -19,6 +19,7 @@ package org.apache.spark.scheduler
import scala.collection.mutable
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.json4s.jackson.JsonMethods._
@@ -36,7 +37,10 @@ import org.apache.spark.util.{FileLogger, JsonProtocol}
* spark.eventLog.dir - Path to the directory in which events are logged.
* spark.eventLog.buffer.kb - Buffer size to use when writing to output streams
*/
-private[spark] class EventLoggingListener(appName: String, conf: SparkConf)
+private[spark] class EventLoggingListener(
+ appName: String,
+ conf: SparkConf,
+ hadoopConfiguration: Configuration)
extends SparkListener with Logging {
import EventLoggingListener._
@@ -49,7 +53,8 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf)
val logDir = logBaseDir + "/" + name
private val logger =
- new FileLogger(logDir, conf, outputBufferSize, shouldCompress, shouldOverwrite)
+ new FileLogger(logDir, conf, hadoopConfiguration, outputBufferSize, shouldCompress,
+ shouldOverwrite)
/**
* Begin logging events.
diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
index 7d58d1c765..7d47b2a72a 100644
--- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala
+++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
@@ -22,7 +22,8 @@ import java.net.URI
import java.text.SimpleDateFormat
import java.util.Date
-import org.apache.hadoop.fs.{FSDataOutputStream, Path}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.io.CompressionCodec
@@ -37,7 +38,8 @@ import org.apache.spark.io.CompressionCodec
*/
private[spark] class FileLogger(
logDir: String,
- conf: SparkConf = new SparkConf,
+ conf: SparkConf,
+ hadoopConfiguration: Configuration,
outputBufferSize: Int = 8 * 1024, // 8 KB
compress: Boolean = false,
overwrite: Boolean = true)
@@ -85,19 +87,20 @@ private[spark] class FileLogger(
private def createWriter(fileName: String): PrintWriter = {
val logPath = logDir + "/" + fileName
val uri = new URI(logPath)
+ val defaultFs = FileSystem.getDefaultUri(hadoopConfiguration).getScheme
+ val isDefaultLocal = (defaultFs == null || defaultFs == "file")
/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844).
* Therefore, for local files, use FileOutputStream instead. */
- val dstream = uri.getScheme match {
- case "file" | null =>
+ val dstream =
+ if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") {
// Second parameter is whether to append
new FileOutputStream(uri.getPath, !overwrite)
-
- case _ =>
+ } else {
val path = new Path(logPath)
hadoopDataStream = Some(fileSystem.create(path, overwrite))
hadoopDataStream.get
- }
+ }
val bstream = new BufferedOutputStream(dstream, outputBufferSize)
val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream