aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2014-08-30 14:48:07 -0700
committerMatei Zaharia <matei@databricks.com>2014-08-30 14:48:07 -0700
commitb6cf1348170951396a6a5d8a65fb670382304f5b (patch)
tree0214f05d66fcdb69373b143ffb998e4de52ff02a /core
parentd90434c03564558a4208f64e15b20009eabe3645 (diff)
downloadspark-b6cf1348170951396a6a5d8a65fb670382304f5b.tar.gz
spark-b6cf1348170951396a6a5d8a65fb670382304f5b.tar.bz2
spark-b6cf1348170951396a6a5d8a65fb670382304f5b.zip
[SPARK-2889] Create Hadoop config objects consistently.
Different places in the code were instantiating Configuration / YarnConfiguration objects in different ways. This could lead to confusion for people who actually expected "spark.hadoop.*" options to end up in the configs used by Spark code, since that would only happen for the SparkContext's config. This change modifies most places to use SparkHadoopUtil to initialize configs, and make that method do the translation that previously was only done inside SparkContext. The places that were not changed fall in one of the following categories: - Test code where this doesn't really matter - Places deep in the code where plumbing SparkConf would be too difficult for very little gain - Default values for arguments - since the caller can provide their own config in that case Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #1843 from vanzin/SPARK-2889 and squashes the following commits: 52daf35 [Marcelo Vanzin] Merge branch 'master' into SPARK-2889 f179013 [Marcelo Vanzin] Merge branch 'master' into SPARK-2889 51e71cf [Marcelo Vanzin] Add test to ensure that overriding Yarn configs works. 53f9506 [Marcelo Vanzin] Add DeveloperApi annotation. 3d345cb [Marcelo Vanzin] Restore old method for backwards compat. fc45067 [Marcelo Vanzin] Merge branch 'master' into SPARK-2889 0ac3fdf [Marcelo Vanzin] Merge branch 'master' into SPARK-2889 3f26760 [Marcelo Vanzin] Compilation fix. f16cadd [Marcelo Vanzin] Initialize config in SparkHadoopUtil. b8ab173 [Marcelo Vanzin] Update Utils API to take a Configuration argument. 1e7003f [Marcelo Vanzin] Replace explicit Configuration instantiation with SparkHadoopUtil.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala39
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala11
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/util/FileLogger.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala20
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala4
17 files changed, 106 insertions, 62 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e132955f0f..a80b3cce60 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -224,26 +224,7 @@ class SparkContext(config: SparkConf) extends Logging {
ui.bind()
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
- val hadoopConfiguration: Configuration = {
- val hadoopConf = SparkHadoopUtil.get.newConfiguration()
- // Explicitly check for S3 environment variables
- if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
- System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
- hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
- hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
- hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
- hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
- }
- // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
- conf.getAll.foreach { case (key, value) =>
- if (key.startsWith("spark.hadoop.")) {
- hadoopConf.set(key.substring("spark.hadoop.".length), value)
- }
- }
- val bufferSize = conf.get("spark.buffer.size", "65536")
- hadoopConf.set("io.file.buffer.size", bufferSize)
- hadoopConf
- }
+ val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)
// Optionally log Spark events
private[spark] val eventLogger: Option[EventLoggingListener] = {
@@ -827,7 +808,8 @@ class SparkContext(config: SparkConf) extends Logging {
addedFiles(key) = System.currentTimeMillis
// Fetch the file locally in case a job is executed using DAGScheduler.runLocally().
- Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager)
+ Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager,
+ hadoopConfiguration)
logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
postEnvironmentUpdate()
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 148115d3ed..fe0ad9ebbc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -24,15 +24,18 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation
-import org.apache.spark.{Logging, SparkContext, SparkException}
+import org.apache.spark.{Logging, SparkContext, SparkConf, SparkException}
+import org.apache.spark.annotation.DeveloperApi
import scala.collection.JavaConversions._
/**
+ * :: DeveloperApi ::
* Contains util methods to interact with Hadoop from Spark.
*/
+@DeveloperApi
class SparkHadoopUtil extends Logging {
- val conf: Configuration = newConfiguration()
+ val conf: Configuration = newConfiguration(new SparkConf())
UserGroupInformation.setConfiguration(conf)
/**
@@ -64,11 +67,39 @@ class SparkHadoopUtil extends Logging {
}
}
+ @Deprecated
+ def newConfiguration(): Configuration = newConfiguration(null)
+
/**
* Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
* subsystems.
*/
- def newConfiguration(): Configuration = new Configuration()
+ def newConfiguration(conf: SparkConf): Configuration = {
+ val hadoopConf = new Configuration()
+
+ // Note: this null check is around more than just access to the "conf" object to maintain
+ // the behavior of the old implementation of this code, for backwards compatibility.
+ if (conf != null) {
+ // Explicitly check for S3 environment variables
+ if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
+ System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
+ hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
+ hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
+ hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
+ hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
+ }
+ // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
+ conf.getAll.foreach { case (key, value) =>
+ if (key.startsWith("spark.hadoop.")) {
+ hadoopConf.set(key.substring("spark.hadoop.".length), value)
+ }
+ }
+ val bufferSize = conf.get("spark.buffer.size", "65536")
+ hadoopConf.set("io.file.buffer.size", bufferSize)
+ }
+
+ hadoopConf
+ }
/**
* Add any user credentials to the job conf which are necessary for running on a secure Hadoop
@@ -86,7 +117,7 @@ class SparkHadoopUtil extends Logging {
def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { null }
- def loginUserFromKeytab(principalName: String, keytabFilename: String) {
+ def loginUserFromKeytab(principalName: String, keytabFilename: String) {
UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index cc06540ee0..05c8a90782 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.Utils
@@ -40,7 +41,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
.map { d => Utils.resolveURI(d) }
.getOrElse { throw new IllegalArgumentException("Logging directory must be specified.") }
- private val fs = Utils.getHadoopFileSystem(resolvedLogDir)
+ private val fs = Utils.getHadoopFileSystem(resolvedLogDir,
+ SparkHadoopUtil.get.newConfiguration(conf))
// A timestamp of when the disk was last accessed to check for log updates
private var lastLogCheckTimeMs = -1L
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 5017273e87..2a66fcfe48 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -33,7 +33,8 @@ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import akka.serialization.SerializationExtension
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
-import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState}
+import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState,
+ SparkHadoopUtil}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.master.DriverState.DriverState
@@ -673,7 +674,8 @@ private[spark] class Master(
app.desc.appUiUrl = notFoundBasePath
return false
}
- val fileSystem = Utils.getHadoopFileSystem(eventLogDir)
+ val fileSystem = Utils.getHadoopFileSystem(eventLogDir,
+ SparkHadoopUtil.get.newConfiguration(conf))
val eventLogInfo = EventLoggingListener.parseLoggingInfo(eventLogDir, fileSystem)
val eventLogPaths = eventLogInfo.logPaths
val compressionCodec = eventLogInfo.compressionCodec
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index 5caaf6bea3..9f99117625 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -28,8 +28,8 @@ import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileUtil, Path}
-import org.apache.spark.Logging
-import org.apache.spark.deploy.{Command, DriverDescription}
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.deploy.{Command, DriverDescription, SparkHadoopUtil}
import org.apache.spark.deploy.DeployMessages.DriverStateChanged
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.deploy.master.DriverState.DriverState
@@ -39,6 +39,7 @@ import org.apache.spark.deploy.master.DriverState.DriverState
* This is currently only used in standalone cluster deploy mode.
*/
private[spark] class DriverRunner(
+ val conf: SparkConf,
val driverId: String,
val workDir: File,
val sparkHome: File,
@@ -144,8 +145,8 @@ private[spark] class DriverRunner(
val jarPath = new Path(driverDesc.jarUrl)
- val emptyConf = new Configuration()
- val jarFileSystem = jarPath.getFileSystem(emptyConf)
+ val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+ val jarFileSystem = jarPath.getFileSystem(hadoopConf)
val destPath = new File(driverDir.getAbsolutePath, jarPath.getName)
val jarFileName = jarPath.getName
@@ -154,7 +155,7 @@ private[spark] class DriverRunner(
if (!localJarFile.exists()) { // May already exist if running multiple workers on one node
logInfo(s"Copying user jar $jarPath to $destPath")
- FileUtil.copy(jarFileSystem, jarPath, destPath, false, emptyConf)
+ FileUtil.copy(jarFileSystem, jarPath, destPath, false, hadoopConf)
}
if (!localJarFile.exists()) { // Verify copy succeeded
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 81400af22c..e475567db6 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -257,7 +257,7 @@ private[spark] class Worker(
val fullId = appId + "/" + execId
if (ExecutorState.isFinished(state)) {
executors.get(fullId) match {
- case Some(executor) =>
+ case Some(executor) =>
logInfo("Executor " + fullId + " finished with state " + state +
message.map(" message " + _).getOrElse("") +
exitStatus.map(" exitStatus " + _).getOrElse(""))
@@ -288,7 +288,7 @@ private[spark] class Worker(
case LaunchDriver(driverId, driverDesc) => {
logInfo(s"Asked to launch driver $driverId")
- val driver = new DriverRunner(driverId, workDir, sparkHome, driverDesc, self, akkaUrl)
+ val driver = new DriverRunner(conf, driverId, workDir, sparkHome, driverDesc, self, akkaUrl)
drivers(driverId) = driver
driver.start()
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 2f76e532ae..d7d19f6fa3 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -26,6 +26,7 @@ import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler._
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
@@ -294,9 +295,9 @@ private[spark] class Executor(
try {
val klass = Class.forName("org.apache.spark.repl.ExecutorClassLoader")
.asInstanceOf[Class[_ <: ClassLoader]]
- val constructor = klass.getConstructor(classOf[String], classOf[ClassLoader],
- classOf[Boolean])
- constructor.newInstance(classUri, parent, userClassPathFirst)
+ val constructor = klass.getConstructor(classOf[SparkConf], classOf[String],
+ classOf[ClassLoader], classOf[Boolean])
+ constructor.newInstance(conf, classUri, parent, userClassPathFirst)
} catch {
case _: ClassNotFoundException =>
logError("Could not find org.apache.spark.repl.ExecutorClassLoader on classpath!")
@@ -313,16 +314,19 @@ private[spark] class Executor(
* SparkContext. Also adds any new JARs we fetched to the class loader.
*/
private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) {
+ val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
synchronized {
// Fetch missing dependencies
for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
- Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager)
+ Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager,
+ hadoopConf)
currentFiles(name) = timestamp
}
for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
- Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager)
+ Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager,
+ hadoopConf)
currentJars(name) = timestamp
// Add it to our class loader
val localName = name.split("/").last
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index 20938781ac..7ba1182f0e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -157,7 +157,7 @@ private[spark] object CheckpointRDD extends Logging {
val sc = new SparkContext(cluster, "CheckpointRDD Test")
val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000)
val path = new Path(hdfsPath, "temp")
- val conf = SparkHadoopUtil.get.newConfiguration()
+ val conf = SparkHadoopUtil.get.newConfiguration(new SparkConf())
val fs = path.getFileSystem(conf)
val broadcastedConf = sc.broadcast(new SerializableWritable(conf))
sc.runJob(rdd, CheckpointRDD.writeToFile[Int](path.toString, broadcastedConf, 1024) _)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 370fcd85aa..4b99f63044 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -44,11 +44,14 @@ import org.apache.spark.util.{FileLogger, JsonProtocol, Utils}
private[spark] class EventLoggingListener(
appName: String,
sparkConf: SparkConf,
- hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration())
+ hadoopConf: Configuration)
extends SparkListener with Logging {
import EventLoggingListener._
+ def this(appName: String, sparkConf: SparkConf) =
+ this(appName, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))
+
private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index 4f7133c4bc..bc7670f4a8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.{Logging, SparkContext, SparkEnv}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.TaskSchedulerImpl
private[spark] class SimrSchedulerBackend(
@@ -44,7 +45,7 @@ private[spark] class SimrSchedulerBackend(
sc.conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
- val conf = new Configuration()
+ val conf = SparkHadoopUtil.get.newConfiguration(sc.conf)
val fs = FileSystem.get(conf)
logInfo("Writing to HDFS file: " + driverFilePath)
@@ -63,7 +64,7 @@ private[spark] class SimrSchedulerBackend(
}
override def stop() {
- val conf = new Configuration()
+ val conf = SparkHadoopUtil.get.newConfiguration(sc.conf)
val fs = FileSystem.get(conf)
fs.delete(new Path(driverFilePath), false)
super.stop()
diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
index ad8b79af87..6d1fc05a15 100644
--- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala
+++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
@@ -41,13 +41,22 @@ import org.apache.spark.io.CompressionCodec
private[spark] class FileLogger(
logDir: String,
sparkConf: SparkConf,
- hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration(),
+ hadoopConf: Configuration,
outputBufferSize: Int = 8 * 1024, // 8 KB
compress: Boolean = false,
overwrite: Boolean = true,
dirPermissions: Option[FsPermission] = None)
extends Logging {
+ def this(
+ logDir: String,
+ sparkConf: SparkConf,
+ compress: Boolean = false,
+ overwrite: Boolean = true) = {
+ this(logDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf), compress = compress,
+ overwrite = overwrite)
+ }
+
private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
}
@@ -57,7 +66,7 @@ private[spark] class FileLogger(
* create unique FileSystem instance only for FileLogger
*/
private val fileSystem = {
- val conf = SparkHadoopUtil.get.newConfiguration()
+ val conf = SparkHadoopUtil.get.newConfiguration(sparkConf)
val logUri = new URI(logDir)
val scheme = logUri.getScheme
if (scheme == "hdfs") {
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 86f646d2af..0ae28f911e 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -34,6 +34,7 @@ import scala.util.control.{ControlThrowable, NonFatal}
import com.google.common.io.Files
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.commons.lang3.SystemUtils
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.json4s._
import tachyon.client.{TachyonFile,TachyonFS}
@@ -318,7 +319,8 @@ private[spark] object Utils extends Logging {
* Throws SparkException if the target file already exists and has different contents than
* the requested file.
*/
- def fetchFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager) {
+ def fetchFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager,
+ hadoopConf: Configuration) {
val filename = url.split("/").last
val tempDir = getLocalDir(conf)
val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir))
@@ -390,7 +392,7 @@ private[spark] object Utils extends Logging {
}
case _ =>
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
- val fs = getHadoopFileSystem(uri)
+ val fs = getHadoopFileSystem(uri, hadoopConf)
val in = fs.open(new Path(uri))
val out = new FileOutputStream(tempFile)
Utils.copyStream(in, out, true)
@@ -862,8 +864,8 @@ private[spark] object Utils extends Logging {
*/
def getCallSite: CallSite = {
val trace = Thread.currentThread.getStackTrace()
- .filterNot { ste:StackTraceElement =>
- // When running under some profilers, the current stack trace might contain some bogus
+ .filterNot { ste:StackTraceElement =>
+ // When running under some profilers, the current stack trace might contain some bogus
// frames. This is intended to ensure that we don't crash in these situations by
// ignoring any frames that we can't examine.
(ste == null || ste.getMethodName == null || ste.getMethodName.contains("getStackTrace"))
@@ -1179,15 +1181,15 @@ private[spark] object Utils extends Logging {
/**
* Return a Hadoop FileSystem with the scheme encoded in the given path.
*/
- def getHadoopFileSystem(path: URI): FileSystem = {
- FileSystem.get(path, SparkHadoopUtil.get.newConfiguration())
+ def getHadoopFileSystem(path: URI, conf: Configuration): FileSystem = {
+ FileSystem.get(path, conf)
}
/**
* Return a Hadoop FileSystem with the scheme encoded in the given path.
*/
- def getHadoopFileSystem(path: String): FileSystem = {
- getHadoopFileSystem(new URI(path))
+ def getHadoopFileSystem(path: String, conf: Configuration): FileSystem = {
+ getHadoopFileSystem(new URI(path), conf)
}
/**
@@ -1264,7 +1266,7 @@ private[spark] object Utils extends Logging {
}
}
- /**
+ /**
* Execute the given block, logging and re-throwing any uncaught exception.
* This is particularly useful for wrapping code that runs in a thread, to ensure
* that exceptions are printed, and to avoid having to catch Throwable.
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 31aa7ec837..2a58c6a40d 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -121,8 +121,8 @@ class JsonProtocolSuite extends FunSuite {
new SparkConf, ExecutorState.RUNNING)
}
def createDriverRunner(): DriverRunner = {
- new DriverRunner("driverId", new File("workDir"), new File("sparkHome"), createDriverDesc(),
- null, "akka://worker")
+ new DriverRunner(new SparkConf(), "driverId", new File("workDir"), new File("sparkHome"),
+ createDriverDesc(), null, "akka://worker")
}
def assertValidJson(json: JValue) {
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
index c930839b47..b6f4411e05 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
@@ -25,14 +25,15 @@ import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.FunSuite
+import org.apache.spark.SparkConf
import org.apache.spark.deploy.{Command, DriverDescription}
class DriverRunnerTest extends FunSuite {
private def createDriverRunner() = {
val command = new Command("mainClass", Seq(), Map(), Seq(), Seq(), Seq())
val driverDescription = new DriverDescription("jarUrl", 512, 1, true, command)
- new DriverRunner("driverId", new File("workDir"), new File("sparkHome"), driverDescription,
- null, "akka://1.2.3.4/worker/")
+ new DriverRunner(new SparkConf(), "driverId", new File("workDir"), new File("sparkHome"),
+ driverDescription, null, "akka://1.2.3.4/worker/")
}
private def createProcessBuilderAndProcess(): (ProcessBuilderLike, Process) = {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 10d8b29931..41e58a008c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -26,6 +26,7 @@ import org.json4s.jackson.JsonMethods._
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.{JsonProtocol, Utils}
@@ -39,7 +40,8 @@ import java.io.File
* read and deserialized into actual SparkListenerEvents.
*/
class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
- private val fileSystem = Utils.getHadoopFileSystem("/")
+ private val fileSystem = Utils.getHadoopFileSystem("/",
+ SparkHadoopUtil.get.newConfiguration(new SparkConf()))
private val allCompressionCodecs = Seq[String](
"org.apache.spark.io.LZFCompressionCodec",
"org.apache.spark.io.SnappyCompressionCodec"
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index 6b6e0104e5..8f0ee9f4db 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -25,6 +25,7 @@ import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark.SparkContext._
import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.{JsonProtocol, Utils}
@@ -32,7 +33,8 @@ import org.apache.spark.util.{JsonProtocol, Utils}
* Test whether ReplayListenerBus replays events from logs correctly.
*/
class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
- private val fileSystem = Utils.getHadoopFileSystem("/")
+ private val fileSystem = Utils.getHadoopFileSystem("/",
+ SparkHadoopUtil.get.newConfiguration(new SparkConf()))
private val allCompressionCodecs = CompressionCodec.ALL_COMPRESSION_CODECS
private var testDir: File = _
diff --git a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala
index 44332fc8db..c3dd156b40 100644
--- a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala
@@ -26,13 +26,15 @@ import org.apache.hadoop.fs.Path
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
/**
* Test writing files through the FileLogger.
*/
class FileLoggerSuite extends FunSuite with BeforeAndAfter {
- private val fileSystem = Utils.getHadoopFileSystem("/")
+ private val fileSystem = Utils.getHadoopFileSystem("/",
+ SparkHadoopUtil.get.newConfiguration(new SparkConf()))
private val allCompressionCodecs = Seq[String](
"org.apache.spark.io.LZFCompressionCodec",
"org.apache.spark.io.SnappyCompressionCodec"