aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala29
1 files changed, 16 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 06b7b388ca..cda9d38c6a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -74,13 +74,12 @@ class SparkHadoopUtil extends Logging {
}
}
+
/**
- * Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
- * subsystems.
+ * Appends S3-specific, spark.hadoop.*, and spark.buffer.size configurations to a Hadoop
+ * configuration.
*/
- def newConfiguration(conf: SparkConf): Configuration = {
- val hadoopConf = new Configuration()
-
+ def appendS3AndSparkHadoopConfigurations(conf: SparkConf, hadoopConf: Configuration): Unit = {
// Note: this null check is around more than just access to the "conf" object to maintain
// the behavior of the old implementation of this code, for backwards compatibility.
if (conf != null) {
@@ -106,7 +105,15 @@ class SparkHadoopUtil extends Logging {
val bufferSize = conf.get("spark.buffer.size", "65536")
hadoopConf.set("io.file.buffer.size", bufferSize)
}
+ }
+ /**
+ * Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
+ * subsystems.
+ */
+ def newConfiguration(conf: SparkConf): Configuration = {
+ val hadoopConf = new Configuration()
+ appendS3AndSparkHadoopConfigurations(conf, hadoopConf)
hadoopConf
}
@@ -145,10 +152,9 @@ class SparkHadoopUtil extends Logging {
val baselineBytesRead = f()
Some(() => f() - baselineBytesRead)
} catch {
- case e @ (_: NoSuchMethodException | _: ClassNotFoundException) => {
+ case e @ (_: NoSuchMethodException | _: ClassNotFoundException) =>
logDebug("Couldn't find method for retrieving thread-level FileSystem input data", e)
None
- }
}
}
@@ -167,10 +173,9 @@ class SparkHadoopUtil extends Logging {
val baselineBytesWritten = f()
Some(() => f() - baselineBytesWritten)
} catch {
- case e @ (_: NoSuchMethodException | _: ClassNotFoundException) => {
+ case e @ (_: NoSuchMethodException | _: ClassNotFoundException) =>
logDebug("Couldn't find method for retrieving thread-level FileSystem output data", e)
None
- }
}
}
@@ -308,7 +313,7 @@ class SparkHadoopUtil extends Logging {
*/
def substituteHadoopVariables(text: String, hadoopConf: Configuration): String = {
text match {
- case HADOOP_CONF_PATTERN(matched) => {
+ case HADOOP_CONF_PATTERN(matched) =>
logDebug(text + " matched " + HADOOP_CONF_PATTERN)
val key = matched.substring(13, matched.length() - 1) // remove ${hadoopconf- .. }
val eval = Option[String](hadoopConf.get(key))
@@ -323,11 +328,9 @@ class SparkHadoopUtil extends Logging {
// Continue to substitute more variables.
substituteHadoopVariables(eval.get, hadoopConf)
}
- }
- case _ => {
+ case _ =>
logDebug(text + " didn't match " + HADOOP_CONF_PATTERN)
text
- }
}
}