diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala | 29 |
1 files changed, 16 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 06b7b388ca..cda9d38c6a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -74,13 +74,12 @@ class SparkHadoopUtil extends Logging { } } + /** - * Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop - * subsystems. + * Appends S3-specific, spark.hadoop.*, and spark.buffer.size configurations to a Hadoop + * configuration. */ - def newConfiguration(conf: SparkConf): Configuration = { - val hadoopConf = new Configuration() - + def appendS3AndSparkHadoopConfigurations(conf: SparkConf, hadoopConf: Configuration): Unit = { // Note: this null check is around more than just access to the "conf" object to maintain // the behavior of the old implementation of this code, for backwards compatibility. if (conf != null) { @@ -106,7 +105,15 @@ class SparkHadoopUtil extends Logging { val bufferSize = conf.get("spark.buffer.size", "65536") hadoopConf.set("io.file.buffer.size", bufferSize) } + } + /** + * Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop + * subsystems. + */ + def newConfiguration(conf: SparkConf): Configuration = { + val hadoopConf = new Configuration() + appendS3AndSparkHadoopConfigurations(conf, hadoopConf) hadoopConf } @@ -145,10 +152,9 @@ class SparkHadoopUtil extends Logging { val baselineBytesRead = f() Some(() => f() - baselineBytesRead) } catch { - case e @ (_: NoSuchMethodException | _: ClassNotFoundException) => { + case e @ (_: NoSuchMethodException | _: ClassNotFoundException) => logDebug("Couldn't find method for retrieving thread-level FileSystem input data", e) None - } } } @@ -167,10 +173,9 @@ class SparkHadoopUtil extends Logging { val baselineBytesWritten = f() Some(() => f() - baselineBytesWritten) } catch { - case e @ (_: NoSuchMethodException | _: ClassNotFoundException) => { + case e @ (_: NoSuchMethodException | _: ClassNotFoundException) => logDebug("Couldn't find method for retrieving thread-level FileSystem output data", e) None - } } } @@ -308,7 +313,7 @@ class SparkHadoopUtil extends Logging { */ def substituteHadoopVariables(text: String, hadoopConf: Configuration): String = { text match { - case HADOOP_CONF_PATTERN(matched) => { + case HADOOP_CONF_PATTERN(matched) => logDebug(text + " matched " + HADOOP_CONF_PATTERN) val key = matched.substring(13, matched.length() - 1) // remove ${hadoopconf- .. } val eval = Option[String](hadoopConf.get(key)) @@ -323,11 +328,9 @@ class SparkHadoopUtil extends Logging { // Continue to substitute more variables. substituteHadoopVariables(eval.get, hadoopConf) } - } - case _ => { + case _ => logDebug(text + " didn't match " + HADOOP_CONF_PATTERN) text - } } } |