diff options
author | Matei Zaharia <matei@databricks.com> | 2013-12-30 22:17:28 -0500 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2013-12-30 22:17:28 -0500 |
commit | 0fa5809768cf60ec62b4277f04e23a44dc1582e2 (patch) | |
tree | fee16620755769a70975c41d894db43633b18098 /core | |
parent | 994f080f8ae3372366e6004600ba791c8a372ff0 (diff) | |
download | spark-0fa5809768cf60ec62b4277f04e23a44dc1582e2.tar.gz spark-0fa5809768cf60ec62b4277f04e23a44dc1582e2.tar.bz2 spark-0fa5809768cf60ec62b4277f04e23a44dc1582e2.zip |
Updated docs for SparkConf and handled review comments
Diffstat (limited to 'core')
9 files changed, 56 insertions, 32 deletions
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 7cb545a6be..31b0773bfe 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -52,7 +52,7 @@ object Partitioner { for (r <- bySize if r.partitioner != None) { return r.partitioner.get } - if (rdd.context.conf.getOrElse("spark.default.parallelism", null) != null) { + if (rdd.context.conf.contains("spark.default.parallelism")) { return new HashPartitioner(rdd.context.defaultParallelism) } else { return new HashPartitioner(bySize.head.partitions.size) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index ae52de409e..96239cf4be 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -16,6 +16,12 @@ import com.typesafe.config.ConfigFactory * For unit tests, you can also call `new SparkConf(false)` to skip loading external settings and * get the same configuration no matter what is on the classpath. * + * All setter methods in this class support chaining. For example, you can write + * `new SparkConf().setMaster("local").setAppName("My app")`. + * + * Note that once a SparkConf object is passed to Spark, it is cloned and can no longer be modified + * by the user. Spark does not support modifying the configuration at runtime. + * * @param loadDefaults whether to load values from the system properties and classpath */ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable { @@ -69,10 +75,7 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable { /** Set JAR files to distribute to the cluster. (Java-friendly version.) */ def setJars(jars: Array[String]): SparkConf = { - if (!jars.isEmpty) { - settings("spark.jars") = jars.mkString(",") - } - this + setJars(jars.toSeq) } /** @@ -102,15 +105,11 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable { * (Java-friendly version.) */ def setExecutorEnv(variables: Array[(String, String)]): SparkConf = { - for ((k, v) <- variables) { - setExecutorEnv(k, v) - } - this + setExecutorEnv(variables.toSeq) } /** - * Set the location where Spark is installed on worker nodes. This is only needed on Mesos if - * you are not using `spark.executor.uri` to disseminate the Spark binary distribution. + * Set the location where Spark is installed on worker nodes. */ def setSparkHome(home: String): SparkConf = { if (home != null) { @@ -154,8 +153,8 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable { /** Get all executor environment variables set on this SparkConf */ def getExecutorEnv: Seq[(String, String)] = { val prefix = "spark.executorEnv." - getAll.filter(pair => pair._1.startsWith(prefix)) - .map(pair => (pair._1.substring(prefix.length), pair._2)) + getAll.filter{case (k, v) => k.startsWith(prefix)} + .map{case (k, v) => (k.substring(prefix.length), v)} } /** Does the configuration contain a given parameter? */ @@ -165,4 +164,12 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable { override def clone: SparkConf = { new SparkConf(false).setAll(settings) } + + /** + * Return a string listing all keys and values, one per line. This is useful to print the + * configuration out for debugging. + */ + def toDebugString: String = { + settings.toArray.sorted.map{case (k, v) => k + "=" + v}.mkString("\n") + } } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 810ed1860b..8134ce7eb3 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -55,14 +55,14 @@ import org.apache.spark.util._ * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. * - * @param conf_ a Spark Config object describing the application configuration. Any settings in + * @param config a Spark Config object describing the application configuration. Any settings in * this config overrides the default configs as well as system properties. * @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. Can * be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]] * from a list of input files or InputFormats for the application. */ class SparkContext( - conf_ : SparkConf, + config: SparkConf, // This is used only by YARN for now, but should be relevant to other cluster types (Mesos, etc) // too. This is typically generated from InputFormatInfo.computePreferredLocations. It contains // a map from hostname to a list of input format splits on the host. @@ -107,7 +107,13 @@ class SparkContext( preferredNodeLocationData) } - val conf = conf_.clone() + private[spark] val conf = config.clone() + + /** + * Return a copy of this SparkContext's configuration. The configuration ''cannot'' be + * changed at runtime. + */ + def getConf: SparkConf = conf.clone() if (!conf.contains("spark.master")) { throw new SparkException("A master URL must be set in your configuration") @@ -135,11 +141,11 @@ class SparkContext( initLogging() // Create the Spark execution environment (cache, map output tracker, etc) - private[spark] val env = SparkEnv.createFromSystemProperties( + private[spark] val env = SparkEnv.create( + conf, "<driver>", conf.get("spark.driver.host"), conf.get("spark.driver.port").toInt, - conf, isDriver = true, isLocal = isLocal) SparkEnv.set(env) @@ -730,7 +736,7 @@ class SparkContext( * (in that order of preference). If neither of these is set, return None. */ private[spark] def getSparkHome(): Option[String] = { - if (conf.getOrElse("spark.home", null) != null) { + if (conf.contains("spark.home")) { Some(conf.get("spark.home")) } else if (System.getenv("SPARK_HOME") != null) { Some(System.getenv("SPARK_HOME")) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 34fad3e763..d06af8e667 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -40,7 +40,7 @@ import com.google.common.collect.MapMaker * objects needs to have the right SparkEnv set. You can get the current environment with * SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set. */ -class SparkEnv ( +class SparkEnv private[spark] ( val executorId: String, val actorSystem: ActorSystem, val serializerManager: SerializerManager, @@ -63,7 +63,7 @@ class SparkEnv ( // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats). private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]() - def stop() { + private[spark] def stop() { pythonWorkers.foreach { case(key, worker) => worker.stop() } httpFileServer.stop() mapOutputTracker.stop() @@ -79,6 +79,7 @@ class SparkEnv ( //actorSystem.awaitTermination() } + private[spark] def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = { synchronized { val key = (pythonExec, envVars) @@ -111,11 +112,11 @@ object SparkEnv extends Logging { env.get() } - def createFromSystemProperties( + private[spark] def create( + conf: SparkConf, executorId: String, hostname: String, port: Int, - conf: SparkConf, isDriver: Boolean, isLocal: Boolean): SparkEnv = { @@ -129,7 +130,7 @@ object SparkEnv extends Logging { } // set only if unset until now. - if (conf.getOrElse("spark.hostPort", null) == null) { + if (!conf.contains("spark.hostPort")) { if (!isDriver){ // unexpected Utils.logErrorWithStack("Unexpected NOT to have spark.hostPort set") @@ -216,7 +217,7 @@ object SparkEnv extends Logging { } // Warn about deprecated spark.cache.class property - if (conf.getOrElse("spark.cache.class", null) != null) { + if (conf.contains("spark.cache.class")) { logWarning("The spark.cache.class property is no longer being used! Specify storage " + "levels using the RDD.persist() method instead.") } diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index e03cf9d13a..d6aeed7661 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -418,6 +418,12 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] new JavaRDD(sc.checkpointFile(path)) } + + /** + * Return a copy of this JavaSparkContext's configuration. The configuration ''cannot'' be + * changed at runtime. + */ + def getConf: SparkConf = sc.getConf } object JavaSparkContext { diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala index 0aa8852649..4dfb19ed8a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala @@ -190,7 +190,7 @@ private[spark] object FaultToleranceTest extends App with Logging { /** Creates a SparkContext, which constructs a Client to interact with our cluster. */ def createClient() = { if (sc != null) { sc.stop() } - // Counter-hack: Because of a hack in SparkEnv#createFromSystemProperties() that changes this + // Counter-hack: Because of a hack in SparkEnv#create() that changes this // property, we need to reset it. System.setProperty("spark.driver.port", "0") sc = new SparkContext(getMasterUrls(masters), "fault-tolerance", containerSparkHome) @@ -417,4 +417,4 @@ private[spark] object Docker extends Logging { "docker ps -l -q".!(ProcessLogger(line => id = line)) new DockerId(id) } -}
\ No newline at end of file +} diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index a6eabc462b..2400154648 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -109,7 +109,7 @@ private[spark] class Executor( // Initialize Spark environment (using system properties read above) private val env = { if (!isLocal) { - val _env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, conf, + val _env = SparkEnv.create(conf, executorId, slaveHostname, 0, isDriver = false, isLocal = false) SparkEnv.set(_env) _env.metricsSystem.registerSource(executorSource) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index b6b89cc7bb..ca3320b22b 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -397,12 +397,11 @@ private[spark] object Utils extends Logging { } def localHostPort(conf: SparkConf): String = { - val retval = conf.getOrElse("spark.hostPort", null) + val retval = conf.getOrElse("spark.hostPort", null) if (retval == null) { logErrorWithStack("spark.hostPort not set but invoking localHostPort") return localHostName() } - retval } @@ -414,9 +413,12 @@ private[spark] object Utils extends Logging { assert(hostPort.indexOf(':') != -1, message) } - // Used by DEBUG code : remove when all testing done def logErrorWithStack(msg: String) { - try { throw new Exception } catch { case ex: Exception => { logError(msg, ex) } } + try { + throw new Exception + } catch { + case ex: Exception => logError(msg, ex) + } } // Typically, this will be of order of number of nodes in cluster diff --git a/core/src/test/resources/spark.conf b/core/src/test/resources/spark.conf index 6c99bdcb7a..aa4e751235 100644 --- a/core/src/test/resources/spark.conf +++ b/core/src/test/resources/spark.conf @@ -1,3 +1,5 @@ +# A simple spark.conf file used only in our unit tests + spark.test.intTestProperty = 1 spark.test { |