aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala42
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala48
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala19
-rw-r--r--docs/monitoring.md7
8 files changed, 124 insertions, 50 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 57b251ff47..72a452e0ae 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -17,14 +17,11 @@
package org.apache.spark.deploy
-import java.io.{File, FileInputStream, IOException}
-import java.util.Properties
import java.util.jar.JarFile
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}
-import org.apache.spark.SparkException
import org.apache.spark.util.Utils
/**
@@ -63,9 +60,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
val defaultProperties = new HashMap[String, String]()
if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile")
Option(propertiesFile).foreach { filename =>
- val file = new File(filename)
- SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) =>
- if (k.startsWith("spark")) {
+ Utils.getPropertiesFromFile(filename).foreach { case (k, v) =>
+ if (k.startsWith("spark.")) {
defaultProperties(k) = v
if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v")
} else {
@@ -90,19 +86,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
*/
private def mergeSparkProperties(): Unit = {
// Use common defaults file, if not specified by user
- if (propertiesFile == null) {
- val sep = File.separator
- val sparkHomeConfig = env.get("SPARK_HOME").map(sparkHome => s"${sparkHome}${sep}conf")
- val confDir = env.get("SPARK_CONF_DIR").orElse(sparkHomeConfig)
-
- confDir.foreach { sparkConfDir =>
- val defaultPath = s"${sparkConfDir}${sep}spark-defaults.conf"
- val file = new File(defaultPath)
- if (file.exists()) {
- propertiesFile = file.getAbsolutePath
- }
- }
- }
+ propertiesFile = Option(propertiesFile).getOrElse(Utils.getDefaultPropertiesFile(env))
val properties = HashMap[String, String]()
properties.putAll(defaultSparkProperties)
@@ -397,23 +381,3 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
SparkSubmit.exitFn()
}
}
-
-object SparkSubmitArguments {
- /** Load properties present in the given file. */
- def getPropertiesFromFile(file: File): Seq[(String, String)] = {
- require(file.exists(), s"Properties file $file does not exist")
- require(file.isFile(), s"Properties file $file is not a normal file")
- val inputStream = new FileInputStream(file)
- try {
- val properties = new Properties()
- properties.load(inputStream)
- properties.stringPropertyNames().toSeq.map(k => (k, properties(k).trim))
- } catch {
- case e: IOException =>
- val message = s"Failed when loading Spark properties file $file"
- throw new SparkException(message, e)
- } finally {
- inputStream.close()
- }
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
index a64170a47b..0125330589 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
@@ -68,7 +68,7 @@ private[spark] object SparkSubmitDriverBootstrapper {
assume(bootstrapDriver != null, "SPARK_SUBMIT_BOOTSTRAP_DRIVER must be set")
// Parse the properties file for the equivalent spark.driver.* configs
- val properties = SparkSubmitArguments.getPropertiesFromFile(new File(propertiesFile)).toMap
+ val properties = Utils.getPropertiesFromFile(propertiesFile)
val confDriverMemory = properties.get("spark.driver.memory")
val confLibraryPath = properties.get("spark.driver.extraLibraryPath")
val confClasspath = properties.get("spark.driver.extraClassPath")
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
index 25fc76c23e..5bce32a04d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
@@ -18,12 +18,14 @@
package org.apache.spark.deploy.history
import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
/**
* Command-line parser for the master.
*/
private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]) {
private var logDir: String = null
+ private var propertiesFile: String = null
parse(args.toList)
@@ -32,11 +34,16 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]
case ("--dir" | "-d") :: value :: tail =>
logDir = value
conf.set("spark.history.fs.logDirectory", value)
+ System.setProperty("spark.history.fs.logDirectory", value)
parse(tail)
case ("--help" | "-h") :: tail =>
printUsageAndExit(0)
+ case ("--properties-file") :: value :: tail =>
+ propertiesFile = value
+ parse(tail)
+
case Nil =>
case _ =>
@@ -44,10 +51,17 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]
}
}
+ // This mutates the SparkConf, so all accesses to it must be made after this line
+ Utils.loadDefaultSparkProperties(conf, propertiesFile)
+
private def printUsageAndExit(exitCode: Int) {
System.err.println(
"""
- |Usage: HistoryServer
+ |Usage: HistoryServer [options]
+ |
+ |Options:
+ | --properties-file FILE Path to a custom Spark properties file.
+ | Default is conf/spark-defaults.conf.
|
|Configuration options can be set by setting the corresponding JVM system property.
|History Server options are always available; additional options depend on the provider.
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
index 4b0dbbe543..e34bee7854 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
@@ -27,6 +27,7 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
var host = Utils.localHostName()
var port = 7077
var webUiPort = 8080
+ var propertiesFile: String = null
// Check for settings in environment variables
if (System.getenv("SPARK_MASTER_HOST") != null) {
@@ -38,12 +39,16 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {
webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt
}
+
+ parse(args.toList)
+
+ // This mutates the SparkConf, so all accesses to it must be made after this line
+ propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
+
if (conf.contains("spark.master.ui.port")) {
webUiPort = conf.get("spark.master.ui.port").toInt
}
- parse(args.toList)
-
def parse(args: List[String]): Unit = args match {
case ("--ip" | "-i") :: value :: tail =>
Utils.checkHost(value, "ip no longer supported, please use hostname " + value)
@@ -63,7 +68,11 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
webUiPort = value
parse(tail)
- case ("--help" | "-h") :: tail =>
+ case ("--properties-file") :: value :: tail =>
+ propertiesFile = value
+ parse(tail)
+
+ case ("--help") :: tail =>
printUsageAndExit(0)
case Nil => {}
@@ -83,7 +92,9 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
" -i HOST, --ip HOST Hostname to listen on (deprecated, please use --host or -h) \n" +
" -h HOST, --host HOST Hostname to listen on\n" +
" -p PORT, --port PORT Port to listen on (default: 7077)\n" +
- " --webui-port PORT Port for web UI (default: 8080)")
+ " --webui-port PORT Port for web UI (default: 8080)\n" +
+ " --properties-file FILE Path to a custom Spark properties file.\n" +
+ " Default is conf/spark-defaults.conf.")
System.exit(exitCode)
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
index 54e3937edd..019cd70f2a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
@@ -33,6 +33,7 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
var memory = inferDefaultMemory()
var masters: Array[String] = null
var workDir: String = null
+ var propertiesFile: String = null
// Check for settings in environment variables
if (System.getenv("SPARK_WORKER_PORT") != null) {
@@ -47,15 +48,19 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
if (System.getenv("SPARK_WORKER_WEBUI_PORT") != null) {
webUiPort = System.getenv("SPARK_WORKER_WEBUI_PORT").toInt
}
- if (conf.contains("spark.worker.ui.port")) {
- webUiPort = conf.get("spark.worker.ui.port").toInt
- }
if (System.getenv("SPARK_WORKER_DIR") != null) {
workDir = System.getenv("SPARK_WORKER_DIR")
}
parse(args.toList)
+ // This mutates the SparkConf, so all accesses to it must be made after this line
+ propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
+
+ if (conf.contains("spark.worker.ui.port")) {
+ webUiPort = conf.get("spark.worker.ui.port").toInt
+ }
+
checkWorkerMemory()
def parse(args: List[String]): Unit = args match {
@@ -89,7 +94,11 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
webUiPort = value
parse(tail)
- case ("--help" | "-h") :: tail =>
+ case ("--properties-file") :: value :: tail =>
+ propertiesFile = value
+ parse(tail)
+
+ case ("--help") :: tail =>
printUsageAndExit(0)
case value :: tail =>
@@ -124,7 +133,9 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
" -i HOST, --ip IP Hostname to listen on (deprecated, please use --host or -h)\n" +
" -h HOST, --host HOST Hostname to listen on\n" +
" -p PORT, --port PORT Port to listen on (default: random)\n" +
- " --webui-port PORT Port for web UI (default: 8081)")
+ " --webui-port PORT Port for web UI (default: 8081)\n" +
+ " --properties-file FILE Path to a custom Spark properties file.\n" +
+ " Default is conf/spark-defaults.conf.")
System.exit(exitCode)
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index aad901620f..cbc4095065 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1410,6 +1410,54 @@ private[spark] object Utils extends Logging {
}
}
+ /**
+ * Load default Spark properties from the given file. If no file is provided,
+ * use the common defaults file. This mutates state in the given SparkConf and
+ * in this JVM's system properties if the config specified in the file is not
+ * already set. Return the path of the properties file used.
+ */
+ def loadDefaultSparkProperties(conf: SparkConf, filePath: String = null): String = {
+ val path = Option(filePath).getOrElse(getDefaultPropertiesFile())
+ Option(path).foreach { confFile =>
+ getPropertiesFromFile(confFile).filter { case (k, v) =>
+ k.startsWith("spark.")
+ }.foreach { case (k, v) =>
+ conf.setIfMissing(k, v)
+ sys.props.getOrElseUpdate(k, v)
+ }
+ }
+ path
+ }
+
+ /** Load properties present in the given file. */
+ def getPropertiesFromFile(filename: String): Map[String, String] = {
+ val file = new File(filename)
+ require(file.exists(), s"Properties file $file does not exist")
+ require(file.isFile(), s"Properties file $file is not a normal file")
+
+ val inReader = new InputStreamReader(new FileInputStream(file), "UTF-8")
+ try {
+ val properties = new Properties()
+ properties.load(inReader)
+ properties.stringPropertyNames().map(k => (k, properties(k).trim)).toMap
+ } catch {
+ case e: IOException =>
+ throw new SparkException(s"Failed when loading Spark properties from $filename", e)
+ } finally {
+ inReader.close()
+ }
+ }
+
+ /** Return the path of the default Spark properties file. */
+ def getDefaultPropertiesFile(env: Map[String, String] = sys.env): String = {
+ env.get("SPARK_CONF_DIR")
+ .orElse(env.get("SPARK_HOME").map { t => s"$t${File.separator}conf" })
+ .map { t => new File(s"$t${File.separator}spark-defaults.conf")}
+ .filter(_.isFile)
+ .map(_.getAbsolutePath)
+ .orNull
+ }
+
/** Return a nice string representation of the exception, including the stack trace. */
def exceptionString(e: Exception): String = {
if (e == null) "" else exceptionString(getFormattedClassName(e), e.getMessage, e.getStackTrace)
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 0344da60da..ea7ef0524d 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -27,6 +27,8 @@ import com.google.common.base.Charsets
import com.google.common.io.Files
import org.scalatest.FunSuite
+import org.apache.spark.SparkConf
+
class UtilsSuite extends FunSuite {
test("bytesToString") {
@@ -332,4 +334,21 @@ class UtilsSuite extends FunSuite {
assert(!tempFile2.exists())
}
+ test("loading properties from file") {
+ val outFile = File.createTempFile("test-load-spark-properties", "test")
+ try {
+ System.setProperty("spark.test.fileNameLoadB", "2")
+ Files.write("spark.test.fileNameLoadA true\n" +
+ "spark.test.fileNameLoadB 1\n", outFile, Charsets.UTF_8)
+ val properties = Utils.getPropertiesFromFile(outFile.getAbsolutePath)
+ properties
+ .filter { case (k, v) => k.startsWith("spark.")}
+ .foreach { case (k, v) => sys.props.getOrElseUpdate(k, v)}
+ val sparkConf = new SparkConf
+ assert(sparkConf.getBoolean("spark.test.fileNameLoadA", false) === true)
+ assert(sparkConf.getInt("spark.test.fileNameLoadB", 1) === 2)
+ } finally {
+ outFile.delete()
+ }
+ }
}
diff --git a/docs/monitoring.md b/docs/monitoring.md
index d07ec4a57a..e3f81a76ac 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -78,6 +78,13 @@ follows:
file system.</td>
</tr>
<tr>
+ <td>spark.history.fs.logDirectory</td>
+ <td>(none)</td>
+ <td>
+ Directory that contains application event logs to be loaded by the history server
+ </td>
+ </tr>
+ <tr>
<td>spark.history.fs.updateInterval</td>
<td>10</td>
<td>