aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2013-12-30 22:17:28 -0500
committerMatei Zaharia <matei@databricks.com>2013-12-30 22:17:28 -0500
commit0fa5809768cf60ec62b4277f04e23a44dc1582e2 (patch)
treefee16620755769a70975c41d894db43633b18098 /core
parent994f080f8ae3372366e6004600ba791c8a372ff0 (diff)
downloadspark-0fa5809768cf60ec62b4277f04e23a44dc1582e2.tar.gz
spark-0fa5809768cf60ec62b4277f04e23a44dc1582e2.tar.bz2
spark-0fa5809768cf60ec62b4277f04e23a44dc1582e2.zip
Updated docs for SparkConf and handled review comments
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/Partitioner.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala10
-rw-r--r--core/src/test/resources/spark.conf2
9 files changed, 56 insertions, 32 deletions
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 7cb545a6be..31b0773bfe 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -52,7 +52,7 @@ object Partitioner {
for (r <- bySize if r.partitioner != None) {
return r.partitioner.get
}
- if (rdd.context.conf.getOrElse("spark.default.parallelism", null) != null) {
+ if (rdd.context.conf.contains("spark.default.parallelism")) {
return new HashPartitioner(rdd.context.defaultParallelism)
} else {
return new HashPartitioner(bySize.head.partitions.size)
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index ae52de409e..96239cf4be 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -16,6 +16,12 @@ import com.typesafe.config.ConfigFactory
* For unit tests, you can also call `new SparkConf(false)` to skip loading external settings and
* get the same configuration no matter what is on the classpath.
*
+ * All setter methods in this class support chaining. For example, you can write
+ * `new SparkConf().setMaster("local").setAppName("My app")`.
+ *
+ * Note that once a SparkConf object is passed to Spark, it is cloned and can no longer be modified
+ * by the user. Spark does not support modifying the configuration at runtime.
+ *
* @param loadDefaults whether to load values from the system properties and classpath
*/
class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
@@ -69,10 +75,7 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
/** Set JAR files to distribute to the cluster. (Java-friendly version.) */
def setJars(jars: Array[String]): SparkConf = {
- if (!jars.isEmpty) {
- settings("spark.jars") = jars.mkString(",")
- }
- this
+ setJars(jars.toSeq)
}
/**
@@ -102,15 +105,11 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
* (Java-friendly version.)
*/
def setExecutorEnv(variables: Array[(String, String)]): SparkConf = {
- for ((k, v) <- variables) {
- setExecutorEnv(k, v)
- }
- this
+ setExecutorEnv(variables.toSeq)
}
/**
- * Set the location where Spark is installed on worker nodes. This is only needed on Mesos if
- * you are not using `spark.executor.uri` to disseminate the Spark binary distribution.
+ * Set the location where Spark is installed on worker nodes.
*/
def setSparkHome(home: String): SparkConf = {
if (home != null) {
@@ -154,8 +153,8 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
/** Get all executor environment variables set on this SparkConf */
def getExecutorEnv: Seq[(String, String)] = {
val prefix = "spark.executorEnv."
- getAll.filter(pair => pair._1.startsWith(prefix))
- .map(pair => (pair._1.substring(prefix.length), pair._2))
+ getAll.filter{case (k, v) => k.startsWith(prefix)}
+ .map{case (k, v) => (k.substring(prefix.length), v)}
}
/** Does the configuration contain a given parameter? */
@@ -165,4 +164,12 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
override def clone: SparkConf = {
new SparkConf(false).setAll(settings)
}
+
+ /**
+ * Return a string listing all keys and values, one per line. This is useful to print the
+ * configuration out for debugging.
+ */
+ def toDebugString: String = {
+ settings.toArray.sorted.map{case (k, v) => k + "=" + v}.mkString("\n")
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 810ed1860b..8134ce7eb3 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -55,14 +55,14 @@ import org.apache.spark.util._
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
*
- * @param conf_ a Spark Config object describing the application configuration. Any settings in
+ * @param config a Spark Config object describing the application configuration. Any settings in
* this config overrides the default configs as well as system properties.
* @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. Can
* be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
* from a list of input files or InputFormats for the application.
*/
class SparkContext(
- conf_ : SparkConf,
+ config: SparkConf,
// This is used only by YARN for now, but should be relevant to other cluster types (Mesos, etc)
// too. This is typically generated from InputFormatInfo.computePreferredLocations. It contains
// a map from hostname to a list of input format splits on the host.
@@ -107,7 +107,13 @@ class SparkContext(
preferredNodeLocationData)
}
- val conf = conf_.clone()
+ private[spark] val conf = config.clone()
+
+ /**
+ * Return a copy of this SparkContext's configuration. The configuration ''cannot'' be
+ * changed at runtime.
+ */
+ def getConf: SparkConf = conf.clone()
if (!conf.contains("spark.master")) {
throw new SparkException("A master URL must be set in your configuration")
@@ -135,11 +141,11 @@ class SparkContext(
initLogging()
// Create the Spark execution environment (cache, map output tracker, etc)
- private[spark] val env = SparkEnv.createFromSystemProperties(
+ private[spark] val env = SparkEnv.create(
+ conf,
"<driver>",
conf.get("spark.driver.host"),
conf.get("spark.driver.port").toInt,
- conf,
isDriver = true,
isLocal = isLocal)
SparkEnv.set(env)
@@ -730,7 +736,7 @@ class SparkContext(
* (in that order of preference). If neither of these is set, return None.
*/
private[spark] def getSparkHome(): Option[String] = {
- if (conf.getOrElse("spark.home", null) != null) {
+ if (conf.contains("spark.home")) {
Some(conf.get("spark.home"))
} else if (System.getenv("SPARK_HOME") != null) {
Some(System.getenv("SPARK_HOME"))
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 34fad3e763..d06af8e667 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -40,7 +40,7 @@ import com.google.common.collect.MapMaker
* objects needs to have the right SparkEnv set. You can get the current environment with
* SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set.
*/
-class SparkEnv (
+class SparkEnv private[spark] (
val executorId: String,
val actorSystem: ActorSystem,
val serializerManager: SerializerManager,
@@ -63,7 +63,7 @@ class SparkEnv (
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
- def stop() {
+ private[spark] def stop() {
pythonWorkers.foreach { case(key, worker) => worker.stop() }
httpFileServer.stop()
mapOutputTracker.stop()
@@ -79,6 +79,7 @@ class SparkEnv (
//actorSystem.awaitTermination()
}
+ private[spark]
def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = {
synchronized {
val key = (pythonExec, envVars)
@@ -111,11 +112,11 @@ object SparkEnv extends Logging {
env.get()
}
- def createFromSystemProperties(
+ private[spark] def create(
+ conf: SparkConf,
executorId: String,
hostname: String,
port: Int,
- conf: SparkConf,
isDriver: Boolean,
isLocal: Boolean): SparkEnv = {
@@ -129,7 +130,7 @@ object SparkEnv extends Logging {
}
// set only if unset until now.
- if (conf.getOrElse("spark.hostPort", null) == null) {
+ if (!conf.contains("spark.hostPort")) {
if (!isDriver){
// unexpected
Utils.logErrorWithStack("Unexpected NOT to have spark.hostPort set")
@@ -216,7 +217,7 @@ object SparkEnv extends Logging {
}
// Warn about deprecated spark.cache.class property
- if (conf.getOrElse("spark.cache.class", null) != null) {
+ if (conf.contains("spark.cache.class")) {
logWarning("The spark.cache.class property is no longer being used! Specify storage " +
"levels using the RDD.persist() method instead.")
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index e03cf9d13a..d6aeed7661 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -418,6 +418,12 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
new JavaRDD(sc.checkpointFile(path))
}
+
+ /**
+ * Return a copy of this JavaSparkContext's configuration. The configuration ''cannot'' be
+ * changed at runtime.
+ */
+ def getConf: SparkConf = sc.getConf
}
object JavaSparkContext {
diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
index 0aa8852649..4dfb19ed8a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -190,7 +190,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
/** Creates a SparkContext, which constructs a Client to interact with our cluster. */
def createClient() = {
if (sc != null) { sc.stop() }
- // Counter-hack: Because of a hack in SparkEnv#createFromSystemProperties() that changes this
+ // Counter-hack: Because of a hack in SparkEnv#create() that changes this
// property, we need to reset it.
System.setProperty("spark.driver.port", "0")
sc = new SparkContext(getMasterUrls(masters), "fault-tolerance", containerSparkHome)
@@ -417,4 +417,4 @@ private[spark] object Docker extends Logging {
"docker ps -l -q".!(ProcessLogger(line => id = line))
new DockerId(id)
}
-} \ No newline at end of file
+}
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index a6eabc462b..2400154648 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -109,7 +109,7 @@ private[spark] class Executor(
// Initialize Spark environment (using system properties read above)
private val env = {
if (!isLocal) {
- val _env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, conf,
+ val _env = SparkEnv.create(conf, executorId, slaveHostname, 0,
isDriver = false, isLocal = false)
SparkEnv.set(_env)
_env.metricsSystem.registerSource(executorSource)
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index b6b89cc7bb..ca3320b22b 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -397,12 +397,11 @@ private[spark] object Utils extends Logging {
}
def localHostPort(conf: SparkConf): String = {
- val retval = conf.getOrElse("spark.hostPort", null)
+ val retval = conf.getOrElse("spark.hostPort", null)
if (retval == null) {
logErrorWithStack("spark.hostPort not set but invoking localHostPort")
return localHostName()
}
-
retval
}
@@ -414,9 +413,12 @@ private[spark] object Utils extends Logging {
assert(hostPort.indexOf(':') != -1, message)
}
- // Used by DEBUG code : remove when all testing done
def logErrorWithStack(msg: String) {
- try { throw new Exception } catch { case ex: Exception => { logError(msg, ex) } }
+ try {
+ throw new Exception
+ } catch {
+ case ex: Exception => logError(msg, ex)
+ }
}
// Typically, this will be of order of number of nodes in cluster
diff --git a/core/src/test/resources/spark.conf b/core/src/test/resources/spark.conf
index 6c99bdcb7a..aa4e751235 100644
--- a/core/src/test/resources/spark.conf
+++ b/core/src/test/resources/spark.conf
@@ -1,3 +1,5 @@
+# A simple spark.conf file used only in our unit tests
+
spark.test.intTestProperty = 1
spark.test {