aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala39
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala11
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/util/FileLogger.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala20
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala4
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala8
-rw-r--r--repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala10
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala2
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala3
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala3
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala8
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala9
-rw-r--r--yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala15
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala4
28 files changed, 144 insertions, 94 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e132955f0f..a80b3cce60 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -224,26 +224,7 @@ class SparkContext(config: SparkConf) extends Logging {
ui.bind()
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
- val hadoopConfiguration: Configuration = {
- val hadoopConf = SparkHadoopUtil.get.newConfiguration()
- // Explicitly check for S3 environment variables
- if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
- System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
- hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
- hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
- hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
- hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
- }
- // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
- conf.getAll.foreach { case (key, value) =>
- if (key.startsWith("spark.hadoop.")) {
- hadoopConf.set(key.substring("spark.hadoop.".length), value)
- }
- }
- val bufferSize = conf.get("spark.buffer.size", "65536")
- hadoopConf.set("io.file.buffer.size", bufferSize)
- hadoopConf
- }
+ val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)
// Optionally log Spark events
private[spark] val eventLogger: Option[EventLoggingListener] = {
@@ -827,7 +808,8 @@ class SparkContext(config: SparkConf) extends Logging {
addedFiles(key) = System.currentTimeMillis
// Fetch the file locally in case a job is executed using DAGScheduler.runLocally().
- Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager)
+ Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager,
+ hadoopConfiguration)
logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
postEnvironmentUpdate()
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 148115d3ed..fe0ad9ebbc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -24,15 +24,18 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation
-import org.apache.spark.{Logging, SparkContext, SparkException}
+import org.apache.spark.{Logging, SparkContext, SparkConf, SparkException}
+import org.apache.spark.annotation.DeveloperApi
import scala.collection.JavaConversions._
/**
+ * :: DeveloperApi ::
* Contains util methods to interact with Hadoop from Spark.
*/
+@DeveloperApi
class SparkHadoopUtil extends Logging {
- val conf: Configuration = newConfiguration()
+ val conf: Configuration = newConfiguration(new SparkConf())
UserGroupInformation.setConfiguration(conf)
/**
@@ -64,11 +67,39 @@ class SparkHadoopUtil extends Logging {
}
}
+ @Deprecated
+ def newConfiguration(): Configuration = newConfiguration(null)
+
/**
* Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
* subsystems.
*/
- def newConfiguration(): Configuration = new Configuration()
+ def newConfiguration(conf: SparkConf): Configuration = {
+ val hadoopConf = new Configuration()
+
+ // Note: this null check is around more than just access to the "conf" object to maintain
+ // the behavior of the old implementation of this code, for backwards compatibility.
+ if (conf != null) {
+ // Explicitly check for S3 environment variables
+ if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
+ System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
+ hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
+ hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
+ hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
+ hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
+ }
+ // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
+ conf.getAll.foreach { case (key, value) =>
+ if (key.startsWith("spark.hadoop.")) {
+ hadoopConf.set(key.substring("spark.hadoop.".length), value)
+ }
+ }
+ val bufferSize = conf.get("spark.buffer.size", "65536")
+ hadoopConf.set("io.file.buffer.size", bufferSize)
+ }
+
+ hadoopConf
+ }
/**
* Add any user credentials to the job conf which are necessary for running on a secure Hadoop
@@ -86,7 +117,7 @@ class SparkHadoopUtil extends Logging {
def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { null }
- def loginUserFromKeytab(principalName: String, keytabFilename: String) {
+ def loginUserFromKeytab(principalName: String, keytabFilename: String) {
UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index cc06540ee0..05c8a90782 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.Utils
@@ -40,7 +41,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
.map { d => Utils.resolveURI(d) }
.getOrElse { throw new IllegalArgumentException("Logging directory must be specified.") }
- private val fs = Utils.getHadoopFileSystem(resolvedLogDir)
+ private val fs = Utils.getHadoopFileSystem(resolvedLogDir,
+ SparkHadoopUtil.get.newConfiguration(conf))
// A timestamp of when the disk was last accessed to check for log updates
private var lastLogCheckTimeMs = -1L
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 5017273e87..2a66fcfe48 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -33,7 +33,8 @@ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import akka.serialization.SerializationExtension
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
-import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState}
+import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState,
+ SparkHadoopUtil}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.master.DriverState.DriverState
@@ -673,7 +674,8 @@ private[spark] class Master(
app.desc.appUiUrl = notFoundBasePath
return false
}
- val fileSystem = Utils.getHadoopFileSystem(eventLogDir)
+ val fileSystem = Utils.getHadoopFileSystem(eventLogDir,
+ SparkHadoopUtil.get.newConfiguration(conf))
val eventLogInfo = EventLoggingListener.parseLoggingInfo(eventLogDir, fileSystem)
val eventLogPaths = eventLogInfo.logPaths
val compressionCodec = eventLogInfo.compressionCodec
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index 5caaf6bea3..9f99117625 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -28,8 +28,8 @@ import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileUtil, Path}
-import org.apache.spark.Logging
-import org.apache.spark.deploy.{Command, DriverDescription}
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.deploy.{Command, DriverDescription, SparkHadoopUtil}
import org.apache.spark.deploy.DeployMessages.DriverStateChanged
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.deploy.master.DriverState.DriverState
@@ -39,6 +39,7 @@ import org.apache.spark.deploy.master.DriverState.DriverState
* This is currently only used in standalone cluster deploy mode.
*/
private[spark] class DriverRunner(
+ val conf: SparkConf,
val driverId: String,
val workDir: File,
val sparkHome: File,
@@ -144,8 +145,8 @@ private[spark] class DriverRunner(
val jarPath = new Path(driverDesc.jarUrl)
- val emptyConf = new Configuration()
- val jarFileSystem = jarPath.getFileSystem(emptyConf)
+ val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+ val jarFileSystem = jarPath.getFileSystem(hadoopConf)
val destPath = new File(driverDir.getAbsolutePath, jarPath.getName)
val jarFileName = jarPath.getName
@@ -154,7 +155,7 @@ private[spark] class DriverRunner(
if (!localJarFile.exists()) { // May already exist if running multiple workers on one node
logInfo(s"Copying user jar $jarPath to $destPath")
- FileUtil.copy(jarFileSystem, jarPath, destPath, false, emptyConf)
+ FileUtil.copy(jarFileSystem, jarPath, destPath, false, hadoopConf)
}
if (!localJarFile.exists()) { // Verify copy succeeded
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 81400af22c..e475567db6 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -257,7 +257,7 @@ private[spark] class Worker(
val fullId = appId + "/" + execId
if (ExecutorState.isFinished(state)) {
executors.get(fullId) match {
- case Some(executor) =>
+ case Some(executor) =>
logInfo("Executor " + fullId + " finished with state " + state +
message.map(" message " + _).getOrElse("") +
exitStatus.map(" exitStatus " + _).getOrElse(""))
@@ -288,7 +288,7 @@ private[spark] class Worker(
case LaunchDriver(driverId, driverDesc) => {
logInfo(s"Asked to launch driver $driverId")
- val driver = new DriverRunner(driverId, workDir, sparkHome, driverDesc, self, akkaUrl)
+ val driver = new DriverRunner(conf, driverId, workDir, sparkHome, driverDesc, self, akkaUrl)
drivers(driverId) = driver
driver.start()
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 2f76e532ae..d7d19f6fa3 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -26,6 +26,7 @@ import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler._
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
@@ -294,9 +295,9 @@ private[spark] class Executor(
try {
val klass = Class.forName("org.apache.spark.repl.ExecutorClassLoader")
.asInstanceOf[Class[_ <: ClassLoader]]
- val constructor = klass.getConstructor(classOf[String], classOf[ClassLoader],
- classOf[Boolean])
- constructor.newInstance(classUri, parent, userClassPathFirst)
+ val constructor = klass.getConstructor(classOf[SparkConf], classOf[String],
+ classOf[ClassLoader], classOf[Boolean])
+ constructor.newInstance(conf, classUri, parent, userClassPathFirst)
} catch {
case _: ClassNotFoundException =>
logError("Could not find org.apache.spark.repl.ExecutorClassLoader on classpath!")
@@ -313,16 +314,19 @@ private[spark] class Executor(
* SparkContext. Also adds any new JARs we fetched to the class loader.
*/
private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) {
+ val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
synchronized {
// Fetch missing dependencies
for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
- Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager)
+ Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager,
+ hadoopConf)
currentFiles(name) = timestamp
}
for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
- Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager)
+ Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager,
+ hadoopConf)
currentJars(name) = timestamp
// Add it to our class loader
val localName = name.split("/").last
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index 20938781ac..7ba1182f0e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -157,7 +157,7 @@ private[spark] object CheckpointRDD extends Logging {
val sc = new SparkContext(cluster, "CheckpointRDD Test")
val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000)
val path = new Path(hdfsPath, "temp")
- val conf = SparkHadoopUtil.get.newConfiguration()
+ val conf = SparkHadoopUtil.get.newConfiguration(new SparkConf())
val fs = path.getFileSystem(conf)
val broadcastedConf = sc.broadcast(new SerializableWritable(conf))
sc.runJob(rdd, CheckpointRDD.writeToFile[Int](path.toString, broadcastedConf, 1024) _)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 370fcd85aa..4b99f63044 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -44,11 +44,14 @@ import org.apache.spark.util.{FileLogger, JsonProtocol, Utils}
private[spark] class EventLoggingListener(
appName: String,
sparkConf: SparkConf,
- hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration())
+ hadoopConf: Configuration)
extends SparkListener with Logging {
import EventLoggingListener._
+ def this(appName: String, sparkConf: SparkConf) =
+ this(appName, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))
+
private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index 4f7133c4bc..bc7670f4a8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.{Logging, SparkContext, SparkEnv}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.TaskSchedulerImpl
private[spark] class SimrSchedulerBackend(
@@ -44,7 +45,7 @@ private[spark] class SimrSchedulerBackend(
sc.conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
- val conf = new Configuration()
+ val conf = SparkHadoopUtil.get.newConfiguration(sc.conf)
val fs = FileSystem.get(conf)
logInfo("Writing to HDFS file: " + driverFilePath)
@@ -63,7 +64,7 @@ private[spark] class SimrSchedulerBackend(
}
override def stop() {
- val conf = new Configuration()
+ val conf = SparkHadoopUtil.get.newConfiguration(sc.conf)
val fs = FileSystem.get(conf)
fs.delete(new Path(driverFilePath), false)
super.stop()
diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
index ad8b79af87..6d1fc05a15 100644
--- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala
+++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
@@ -41,13 +41,22 @@ import org.apache.spark.io.CompressionCodec
private[spark] class FileLogger(
logDir: String,
sparkConf: SparkConf,
- hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration(),
+ hadoopConf: Configuration,
outputBufferSize: Int = 8 * 1024, // 8 KB
compress: Boolean = false,
overwrite: Boolean = true,
dirPermissions: Option[FsPermission] = None)
extends Logging {
+ def this(
+ logDir: String,
+ sparkConf: SparkConf,
+ compress: Boolean = false,
+ overwrite: Boolean = true) = {
+ this(logDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf), compress = compress,
+ overwrite = overwrite)
+ }
+
private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
}
@@ -57,7 +66,7 @@ private[spark] class FileLogger(
* create unique FileSystem instance only for FileLogger
*/
private val fileSystem = {
- val conf = SparkHadoopUtil.get.newConfiguration()
+ val conf = SparkHadoopUtil.get.newConfiguration(sparkConf)
val logUri = new URI(logDir)
val scheme = logUri.getScheme
if (scheme == "hdfs") {
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 86f646d2af..0ae28f911e 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -34,6 +34,7 @@ import scala.util.control.{ControlThrowable, NonFatal}
import com.google.common.io.Files
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.commons.lang3.SystemUtils
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.json4s._
import tachyon.client.{TachyonFile,TachyonFS}
@@ -318,7 +319,8 @@ private[spark] object Utils extends Logging {
* Throws SparkException if the target file already exists and has different contents than
* the requested file.
*/
- def fetchFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager) {
+ def fetchFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager,
+ hadoopConf: Configuration) {
val filename = url.split("/").last
val tempDir = getLocalDir(conf)
val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir))
@@ -390,7 +392,7 @@ private[spark] object Utils extends Logging {
}
case _ =>
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
- val fs = getHadoopFileSystem(uri)
+ val fs = getHadoopFileSystem(uri, hadoopConf)
val in = fs.open(new Path(uri))
val out = new FileOutputStream(tempFile)
Utils.copyStream(in, out, true)
@@ -862,8 +864,8 @@ private[spark] object Utils extends Logging {
*/
def getCallSite: CallSite = {
val trace = Thread.currentThread.getStackTrace()
- .filterNot { ste:StackTraceElement =>
- // When running under some profilers, the current stack trace might contain some bogus
+ .filterNot { ste:StackTraceElement =>
+ // When running under some profilers, the current stack trace might contain some bogus
// frames. This is intended to ensure that we don't crash in these situations by
// ignoring any frames that we can't examine.
(ste == null || ste.getMethodName == null || ste.getMethodName.contains("getStackTrace"))
@@ -1179,15 +1181,15 @@ private[spark] object Utils extends Logging {
/**
* Return a Hadoop FileSystem with the scheme encoded in the given path.
*/
- def getHadoopFileSystem(path: URI): FileSystem = {
- FileSystem.get(path, SparkHadoopUtil.get.newConfiguration())
+ def getHadoopFileSystem(path: URI, conf: Configuration): FileSystem = {
+ FileSystem.get(path, conf)
}
/**
* Return a Hadoop FileSystem with the scheme encoded in the given path.
*/
- def getHadoopFileSystem(path: String): FileSystem = {
- getHadoopFileSystem(new URI(path))
+ def getHadoopFileSystem(path: String, conf: Configuration): FileSystem = {
+ getHadoopFileSystem(new URI(path), conf)
}
/**
@@ -1264,7 +1266,7 @@ private[spark] object Utils extends Logging {
}
}
- /**
+ /**
* Execute the given block, logging and re-throwing any uncaught exception.
* This is particularly useful for wrapping code that runs in a thread, to ensure
* that exceptions are printed, and to avoid having to catch Throwable.
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 31aa7ec837..2a58c6a40d 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -121,8 +121,8 @@ class JsonProtocolSuite extends FunSuite {
new SparkConf, ExecutorState.RUNNING)
}
def createDriverRunner(): DriverRunner = {
- new DriverRunner("driverId", new File("workDir"), new File("sparkHome"), createDriverDesc(),
- null, "akka://worker")
+ new DriverRunner(new SparkConf(), "driverId", new File("workDir"), new File("sparkHome"),
+ createDriverDesc(), null, "akka://worker")
}
def assertValidJson(json: JValue) {
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
index c930839b47..b6f4411e05 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
@@ -25,14 +25,15 @@ import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.FunSuite
+import org.apache.spark.SparkConf
import org.apache.spark.deploy.{Command, DriverDescription}
class DriverRunnerTest extends FunSuite {
private def createDriverRunner() = {
val command = new Command("mainClass", Seq(), Map(), Seq(), Seq(), Seq())
val driverDescription = new DriverDescription("jarUrl", 512, 1, true, command)
- new DriverRunner("driverId", new File("workDir"), new File("sparkHome"), driverDescription,
- null, "akka://1.2.3.4/worker/")
+ new DriverRunner(new SparkConf(), "driverId", new File("workDir"), new File("sparkHome"),
+ driverDescription, null, "akka://1.2.3.4/worker/")
}
private def createProcessBuilderAndProcess(): (ProcessBuilderLike, Process) = {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 10d8b29931..41e58a008c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -26,6 +26,7 @@ import org.json4s.jackson.JsonMethods._
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.{JsonProtocol, Utils}
@@ -39,7 +40,8 @@ import java.io.File
* read and deserialized into actual SparkListenerEvents.
*/
class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
- private val fileSystem = Utils.getHadoopFileSystem("/")
+ private val fileSystem = Utils.getHadoopFileSystem("/",
+ SparkHadoopUtil.get.newConfiguration(new SparkConf()))
private val allCompressionCodecs = Seq[String](
"org.apache.spark.io.LZFCompressionCodec",
"org.apache.spark.io.SnappyCompressionCodec"
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index 6b6e0104e5..8f0ee9f4db 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -25,6 +25,7 @@ import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark.SparkContext._
import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.{JsonProtocol, Utils}
@@ -32,7 +33,8 @@ import org.apache.spark.util.{JsonProtocol, Utils}
* Test whether ReplayListenerBus replays events from logs correctly.
*/
class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
- private val fileSystem = Utils.getHadoopFileSystem("/")
+ private val fileSystem = Utils.getHadoopFileSystem("/",
+ SparkHadoopUtil.get.newConfiguration(new SparkConf()))
private val allCompressionCodecs = CompressionCodec.ALL_COMPRESSION_CODECS
private var testDir: File = _
diff --git a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala
index 44332fc8db..c3dd156b40 100644
--- a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala
@@ -26,13 +26,15 @@ import org.apache.hadoop.fs.Path
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
/**
* Test writing files through the FileLogger.
*/
class FileLoggerSuite extends FunSuite with BeforeAndAfter {
- private val fileSystem = Utils.getHadoopFileSystem("/")
+ private val fileSystem = Utils.getHadoopFileSystem("/",
+ SparkHadoopUtil.get.newConfiguration(new SparkConf()))
private val allCompressionCodecs = Seq[String](
"org.apache.spark.io.LZFCompressionCodec",
"org.apache.spark.io.SnappyCompressionCodec"
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
index d583cf421e..3258510894 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
@@ -22,9 +22,9 @@ import java.util.Random
import scala.math.exp
import breeze.linalg.{Vector, DenseVector}
+import org.apache.hadoop.conf.Configuration
import org.apache.spark._
-import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.InputFormatInfo
@@ -70,7 +70,7 @@ object SparkHdfsLR {
val sparkConf = new SparkConf().setAppName("SparkHdfsLR")
val inputPath = args(0)
- val conf = SparkHadoopUtil.get.newConfiguration()
+ val conf = new Configuration()
val sc = new SparkContext(sparkConf,
InputFormatInfo.computePreferredLocations(
Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath))
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala
index 2212762186..96d13612e4 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala
@@ -22,9 +22,9 @@ import java.util.Random
import scala.math.exp
import breeze.linalg.{Vector, DenseVector}
+import org.apache.hadoop.conf.Configuration
import org.apache.spark._
-import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.InputFormatInfo
import org.apache.spark.storage.StorageLevel
@@ -52,8 +52,8 @@ object SparkTachyonHdfsLR {
def main(args: Array[String]) {
val inputPath = args(0)
- val conf = SparkHadoopUtil.get.newConfiguration()
val sparkConf = new SparkConf().setAppName("SparkTachyonHdfsLR")
+ val conf = new Configuration()
val sc = new SparkContext(sparkConf,
InputFormatInfo.computePreferredLocations(
Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath))
diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
index 687e85ca94..5ee325008a 100644
--- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
@@ -21,10 +21,10 @@ import java.io.{ByteArrayOutputStream, InputStream}
import java.net.{URI, URL, URLEncoder}
import java.util.concurrent.{Executors, ExecutorService}
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.spark.SparkEnv
+import org.apache.spark.{SparkConf, SparkEnv}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
import org.apache.spark.util.ParentClassLoader
@@ -36,7 +36,7 @@ import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._
* used to load classes defined by the interpreter when the REPL is used.
* Allows the user to specify if user class path should be first
*/
-class ExecutorClassLoader(classUri: String, parent: ClassLoader,
+class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: ClassLoader,
userClassPathFirst: Boolean) extends ClassLoader {
val uri = new URI(classUri)
val directory = uri.getPath
@@ -48,7 +48,7 @@ class ExecutorClassLoader(classUri: String, parent: ClassLoader,
if (uri.getScheme() == "http") {
null
} else {
- FileSystem.get(uri, new Configuration())
+ FileSystem.get(uri, SparkHadoopUtil.get.newConfiguration(conf))
}
}
diff --git a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
index c0af7ceb6d..3e2ee7541f 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
@@ -25,7 +25,7 @@ import org.scalatest.FunSuite
import com.google.common.io.Files
-import org.apache.spark.TestUtils
+import org.apache.spark.{SparkConf, TestUtils}
import org.apache.spark.util.Utils
class ExecutorClassLoaderSuite extends FunSuite with BeforeAndAfterAll {
@@ -57,7 +57,7 @@ class ExecutorClassLoaderSuite extends FunSuite with BeforeAndAfterAll {
test("child first") {
val parentLoader = new URLClassLoader(urls2, null)
- val classLoader = new ExecutorClassLoader(url1, parentLoader, true)
+ val classLoader = new ExecutorClassLoader(new SparkConf(), url1, parentLoader, true)
val fakeClass = classLoader.loadClass("ReplFakeClass2").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "1")
@@ -65,7 +65,7 @@ class ExecutorClassLoaderSuite extends FunSuite with BeforeAndAfterAll {
test("parent first") {
val parentLoader = new URLClassLoader(urls2, null)
- val classLoader = new ExecutorClassLoader(url1, parentLoader, false)
+ val classLoader = new ExecutorClassLoader(new SparkConf(), url1, parentLoader, false)
val fakeClass = classLoader.loadClass("ReplFakeClass1").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "2")
@@ -73,7 +73,7 @@ class ExecutorClassLoaderSuite extends FunSuite with BeforeAndAfterAll {
test("child first can fall back") {
val parentLoader = new URLClassLoader(urls2, null)
- val classLoader = new ExecutorClassLoader(url1, parentLoader, true)
+ val classLoader = new ExecutorClassLoader(new SparkConf(), url1, parentLoader, true)
val fakeClass = classLoader.loadClass("ReplFakeClass3").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "2")
@@ -81,7 +81,7 @@ class ExecutorClassLoaderSuite extends FunSuite with BeforeAndAfterAll {
test("child first can fail") {
val parentLoader = new URLClassLoader(urls2, null)
- val classLoader = new ExecutorClassLoader(url1, parentLoader, true)
+ val classLoader = new ExecutorClassLoader(new SparkConf(), url1, parentLoader, true)
intercept[java.lang.ClassNotFoundException] {
classLoader.loadClass("ReplFakeClassDoesNotExist").newInstance()
}
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 9be78546c1..12f1cd3813 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -40,7 +40,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
extends YarnClientImpl with ClientBase with Logging {
def this(clientArgs: ClientArguments, spConf: SparkConf) =
- this(clientArgs, new Configuration(), spConf)
+ this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)
def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf())
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 5f66a98e75..8c54840971 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -48,7 +48,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
// optimal as more containers are available. Might need to handle this better.
private val sparkConf = new SparkConf()
- private val yarnConf: YarnConfiguration = new YarnConfiguration(new Configuration())
+ private val yarnConf: YarnConfiguration = SparkHadoopUtil.get.newConfiguration(sparkConf)
+ .asInstanceOf[YarnConfiguration]
private val isDriver = args.userClass != null
// Default to numExecutors * 2, with minimum of 3
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 2aa27a1908..ffe2731ca1 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -54,7 +54,8 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
// Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
// Always create a new config, dont reuse yarnConf.
- override def newConfiguration(): Configuration = new YarnConfiguration(new Configuration())
+ override def newConfiguration(conf: SparkConf): Configuration =
+ new YarnConfiguration(super.newConfiguration(conf))
// add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
override def addCredentials(conf: JobConf) {
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
index d162b4c433..254774a6b8 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
@@ -18,7 +18,6 @@
package org.apache.spark.scheduler.cluster
import org.apache.spark._
-import org.apache.hadoop.conf.Configuration
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.Utils
@@ -26,14 +25,11 @@ import org.apache.spark.util.Utils
/**
* This scheduler launches executors through Yarn - by calling into Client to launch the Spark AM.
*/
-private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration)
- extends TaskSchedulerImpl(sc) {
-
- def this(sc: SparkContext) = this(sc, new Configuration())
+private[spark] class YarnClientClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
// By default, rack is unknown
override def getRackForHost(hostPort: String): Option[String] = {
val host = Utils.parseHostPort(hostPort)._1
- Option(YarnSparkHadoopUtil.lookupRack(conf, host))
+ Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
}
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
index 69f40225a2..4157ff95c2 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -21,19 +21,15 @@ import org.apache.spark._
import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.Utils
-import org.apache.hadoop.conf.Configuration
/**
* This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of
* ApplicationMaster, etc is done
*/
-private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
- extends TaskSchedulerImpl(sc) {
+private[spark] class YarnClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
logInfo("Created YarnClusterScheduler")
- def this(sc: SparkContext) = this(sc, new Configuration())
-
// Nothing else for now ... initialize application master : which needs a SparkContext to
// determine how to allocate.
// Note that only the first creation of a SparkContext influences (and ideally, there must be
@@ -43,8 +39,7 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
// By default, rack is unknown
override def getRackForHost(hostPort: String): Option[String] = {
val host = Utils.parseHostPort(hostPort)._1
- val retval = YarnSparkHadoopUtil.lookupRack(conf, host)
- if (retval != null) Some(retval) else None
+ Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
}
override def postStartHook() {
diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index 7650bd4396..75db8ee6d4 100644
--- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -20,9 +20,10 @@ package org.apache.spark.deploy.yarn
import java.io.{File, IOException}
import com.google.common.io.{ByteStreams, Files}
+import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.scalatest.{FunSuite, Matchers}
-import org.apache.spark.Logging
+import org.apache.spark.{Logging, SparkConf}
class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging {
@@ -61,4 +62,16 @@ class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging {
}
}
+ test("Yarn configuration override") {
+ val key = "yarn.nodemanager.hostname"
+ val default = new YarnConfiguration()
+
+ val sparkConf = new SparkConf()
+ .set("spark.hadoop." + key, "someHostName")
+ val yarnConf = new YarnSparkHadoopUtil().newConfiguration(sparkConf)
+
+ yarnConf.getClass() should be (classOf[YarnConfiguration])
+ yarnConf.get(key) should not be default.get(key)
+ }
+
}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 1f9a4bf209..313a0d21ce 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.Records
import org.apache.spark.{Logging, SparkConf}
-
+import org.apache.spark.deploy.SparkHadoopUtil
/**
* Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's stable API.
@@ -40,7 +40,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
val yarnClient = YarnClient.createYarnClient
def this(clientArgs: ClientArguments, spConf: SparkConf) =
- this(clientArgs, new Configuration(), spConf)
+ this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)
def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf())