aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2013-12-28 17:13:15 -0500
committerMatei Zaharia <matei@databricks.com>2013-12-28 17:13:15 -0500
commit642029e7f43322f84abe4f7f36bb0b1b95d8101d (patch)
treecef080193815b279b99a8b35f2401873a3ea3eb1 /core
parent2573add94cf920a88f74d80d8ea94218d812704d (diff)
downloadspark-642029e7f43322f84abe4f7f36bb0b1b95d8101d.tar.gz
spark-642029e7f43322f84abe4f7f36bb0b1b95d8101d.tar.bz2
spark-642029e7f43322f84abe4f7f36bb0b1b95d8101d.zip
Various fixes to configuration code
- Got rid of global SparkContext.globalConf - Pass SparkConf to serializers and compression codecs - Made SparkConf public instead of private[spark] - Improved API of SparkContext and SparkConf - Switched executor environment vars to be passed through SparkConf - Fixed some places that were still using system properties - Fixed some tests, though others are still failing This still fails several tests in core, repl and streaming, likely due to properties not being set or cleared correctly (some of the tests run fine in isolation).
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulators.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/Partitioner.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala158
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala138
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala33
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala38
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala36
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/io/CompressionCodec.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/network/ConnectionManager.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/network/ReceiverTest.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/network/SenderTest.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala46
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala33
-rw-r--r--core/src/main/scala/org/apache/spark/util/SizeEstimator.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala14
-rw-r--r--core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala14
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala2
69 files changed, 530 insertions, 387 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index 6e922a612a..5f73d234aa 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -41,7 +41,7 @@ class Accumulable[R, T] (
@transient initialValue: R,
param: AccumulableParam[R, T])
extends Serializable {
-
+
val id = Accumulators.newId
@transient private var value_ = initialValue // Current value on master
val zero = param.zero(initialValue) // Zero value to be passed to workers
@@ -113,7 +113,7 @@ class Accumulable[R, T] (
def setValue(newValue: R) {
this.value = newValue
}
-
+
// Called by Java when deserializing an object
private def readObject(in: ObjectInputStream) {
in.defaultReadObject()
@@ -177,7 +177,7 @@ class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Ser
def zero(initialValue: R): R = {
// We need to clone initialValue, but it's hard to specify that R should also be Cloneable.
// Instead we'll serialize it to a buffer and load it back.
- val ser = new JavaSerializer().newInstance()
+ val ser = new JavaSerializer(new SparkConf(false)).newInstance()
val copy = ser.deserialize[R](ser.serialize(initialValue))
copy.clear() // In case it contained stuff
copy
@@ -215,7 +215,7 @@ private object Accumulators {
val originals = Map[Long, Accumulable[_, _]]()
val localAccums = Map[Thread, Map[Long, Accumulable[_, _]]]()
var lastId: Long = 0
-
+
def newId: Long = synchronized {
lastId += 1
return lastId
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 4520edb10d..cdae167aef 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -65,7 +65,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging {
protected val epochLock = new java.lang.Object
private val metadataCleaner =
- new MetadataCleaner(MetadataCleanerType.MAP_OUTPUT_TRACKER, this.cleanup)
+ new MetadataCleaner(MetadataCleanerType.MAP_OUTPUT_TRACKER, this.cleanup, conf)
// Send a message to the trackerActor and get its result within a default timeout, or
// throw a SparkException if this fails.
@@ -129,7 +129,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging {
if (fetchedStatuses == null) {
// We won the race to fetch the output locs; do so
logInfo("Doing the fetch; tracker actor = " + trackerActor)
- val hostPort = Utils.localHostPort()
+ val hostPort = Utils.localHostPort(conf)
// This try-finally prevents hangs due to timeouts:
try {
val fetchedBytes =
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 04c1eedfeb..7cb545a6be 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -32,8 +32,6 @@ abstract class Partitioner extends Serializable {
}
object Partitioner {
-
- import SparkContext.{globalConf => conf}
/**
* Choose a partitioner to use for a cogroup-like operation between a number of RDDs.
*
@@ -54,7 +52,7 @@ object Partitioner {
for (r <- bySize if r.partitioner != None) {
return r.partitioner.get
}
- if (conf.getOrElse("spark.default.parallelism", null) != null) {
+ if (rdd.context.conf.getOrElse("spark.default.parallelism", null) != null) {
return new HashPartitioner(rdd.context.defaultParallelism)
} else {
return new HashPartitioner(bySize.head.partitions.size)
@@ -92,7 +90,7 @@ class HashPartitioner(partitions: Int) extends Partitioner {
class RangePartitioner[K <% Ordered[K]: ClassTag, V](
partitions: Int,
@transient rdd: RDD[_ <: Product2[K,V]],
- private val ascending: Boolean = true)
+ private val ascending: Boolean = true)
extends Partitioner {
// An array of upper bounds for the first (partitions - 1) partitions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 9a4eefad2e..185ddb1fe5 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -1,71 +1,159 @@
package org.apache.spark
-import scala.collection.JavaConversions._
-import scala.collection.concurrent.TrieMap
+import scala.collection.JavaConverters._
+import scala.collection.mutable.HashMap
import com.typesafe.config.ConfigFactory
-private[spark] class SparkConf(loadClasspathRes: Boolean = true) extends Serializable {
- @transient lazy val config = ConfigFactory.systemProperties()
- .withFallback(ConfigFactory.parseResources("spark.conf"))
- // TODO this should actually be synchronized
- private val configMap = TrieMap[String, String]()
+/**
+ * Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
+ *
+ * Most of the time, you would create a SparkConf object with `new SparkConf()`, which will load
+ * values from both the `spark.*` Java system properties and any `spark.conf` on your application's
+ * classpath (if it has one). In this case, system properties take priority over `spark.conf`, and
+ * any parameters you set directly on the `SparkConf` object take priority over both of those.
+ *
+ * For unit tests, you can also call `new SparkConf(false)` to skip loading external settings and
+ * get the same configuration no matter what is on the classpath.
+ *
+ * @param loadDefaults whether to load values from the system properties and classpath
+ */
+class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
- if (loadClasspathRes && !config.entrySet().isEmpty) {
- for (e <- config.entrySet()) {
- configMap += ((e.getKey, e.getValue.unwrapped().toString))
+ /** Create a SparkConf that loads defaults from system properties and the classpath */
+ def this() = this(true)
+
+ private val settings = new HashMap[String, String]()
+
+ if (loadDefaults) {
+ val typesafeConfig = ConfigFactory.systemProperties()
+ .withFallback(ConfigFactory.parseResources("spark.conf"))
+ for (e <- typesafeConfig.entrySet().asScala) {
+ settings(e.getKey) = e.getValue.unwrapped.toString
}
}
- def setMasterUrl(master: String) = {
- if (master != null)
- configMap += (("spark.master", master))
+ /** Set a configuration variable. */
+ def set(key: String, value: String): SparkConf = {
+ settings(key) = value
+ this
+ }
+
+ /**
+ * The master URL to connect to, such as "local" to run locally with one thread, "local[4]" to
+ * run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster.
+ */
+ def setMaster(master: String): SparkConf = {
+ if (master != null) {
+ settings("spark.master") = master
+ }
this
}
- def setAppName(name: String) = {
- if (name != null)
- configMap += (("spark.appName", name))
+ /** Set a name for your application. Shown in the Spark web UI. */
+ def setAppName(name: String): SparkConf = {
+ if (name != null) {
+ settings("spark.appName") = name
+ }
this
}
- def setJars(jars: Seq[String]) = {
- if (!jars.isEmpty)
- configMap += (("spark.jars", jars.mkString(",")))
+ /** Set JAR files to distribute to the cluster. */
+ def setJars(jars: Seq[String]): SparkConf = {
+ if (!jars.isEmpty) {
+ settings("spark.jars") = jars.mkString(",")
+ }
this
}
- def set(k: String, value: String) = {
- configMap += ((k, value))
+ /** Set JAR files to distribute to the cluster. (Java-friendly version.) */
+ def setJars(jars: Array[String]): SparkConf = {
+ if (!jars.isEmpty) {
+ settings("spark.jars") = jars.mkString(",")
+ }
this
}
- def setSparkHome(home: String) = {
- if (home != null)
- configMap += (("spark.home", home))
+ /** Set an environment variable to be used when launching executors for this application. */
+ def setExecutorEnv(variable: String, value: String): SparkConf = {
+ settings("spark.executorEnv." + variable) = value
this
}
- def set(map: Seq[(String, String)]) = {
- if (map != null && !map.isEmpty)
- configMap ++= map
+ /** Set multiple environment variables to be used when launching executors. */
+ def setExecutorEnv(variables: Seq[(String, String)]): SparkConf = {
+ for ((k, v) <- variables) {
+ setExecutorEnv(k, v)
+ }
this
}
- def get(k: String): String = {
- configMap(k)
+ /**
+ * Set multiple environment variables to be used when launching executors.
+ * (Java-friendly version.)
+ */
+ def setExecutorEnv(variables: Array[(String, String)]): SparkConf = {
+ for ((k, v) <- variables) {
+ setExecutorEnv(k, v)
+ }
+ this
}
- def getAllConfiguration = configMap.clone.entrySet().iterator
+ /**
+ * Set the location where Spark is installed on worker nodes. This is only needed on Mesos if
+ * you are not using `spark.executor.uri` to disseminate the Spark binary distribution.
+ */
+ def setSparkHome(home: String): SparkConf = {
+ if (home != null) {
+ settings("spark.home") = home
+ }
+ this
+ }
+ /** Set multiple parameters together */
+ def setAll(settings: Traversable[(String, String)]) = {
+ this.settings ++= settings
+ this
+ }
+
+ /** Set a parameter if it isn't already configured */
+ def setIfMissing(key: String, value: String): SparkConf = {
+ if (!settings.contains(key)) {
+ settings(key) = value
+ }
+ this
+ }
+
+ /** Get a parameter; throws an exception if it's not set */
+ def get(key: String): String = {
+ settings(key)
+ }
+
+ /** Get a parameter as an Option */
+ def getOption(key: String): Option[String] = {
+ settings.get(key)
+ }
+
+ /** Get all parameters as a list of pairs */
+ def getAll: Seq[(String, String)] = settings.clone().toSeq
+
+ /** Get a parameter, falling back to a default if not set */
def getOrElse(k: String, defaultValue: String): String = {
- configMap.getOrElse(k, defaultValue)
+ settings.getOrElse(k, defaultValue)
}
- override def clone: SparkConf = {
- val conf = new SparkConf(false)
- conf.set(configMap.toSeq)
- conf
+ /** Get all executor environment variables set on this SparkConf */
+ def getExecutorEnv: Seq[(String, String)] = {
+ val prefix = "spark.executorEnv."
+ getAll.filter(pair => pair._1.startsWith(prefix))
+ .map(pair => (pair._1.substring(prefix.length), pair._2))
}
+ /** Does the configuration contain a given parameter? */
+ def contains(key: String): Boolean = settings.contains(key)
+
+ /** Copy this object */
+ override def clone: SparkConf = {
+ new SparkConf(false).setAll(settings)
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 4300b07bdb..0567f7f437 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -22,8 +22,7 @@ import java.net.URI
import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger
-import scala.collection.{Map, immutable}
-import scala.collection.JavaConversions._
+import scala.collection.{Map, Set, immutable}
import scala.collection.generic.Growable
import scala.collection.mutable.{ArrayBuffer, HashMap}
@@ -57,23 +56,32 @@ import org.apache.spark.util._
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
*
- * @param conf a Spark Config object describing the context configuration. Any settings in this
- * config overrides the default configs as well as system properties.
- *
- * @param environment Environment variables to set on worker nodes.
+ * @param conf_ a Spark Config object describing the application configuration. Any settings in
+ * this config overrides the default configs as well as system properties.
+ * @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. Can
+ * be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
+ * from a list of input files or InputFormats for the application.
*/
class SparkContext(
- val conf: SparkConf,
- val environment: Map[String, String] = Map(),
+ conf_ : SparkConf,
// This is used only by YARN for now, but should be relevant to other cluster types (Mesos, etc)
- // too. This is typically generated from InputFormatInfo.computePreferredLocations .. host, set
- // of data-local splits on host
- val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = immutable.Map())
+ // too. This is typically generated from InputFormatInfo.computePreferredLocations. It contains
+ // a map from hostname to a list of input format splits on the host.
+ val preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map())
extends Logging {
/**
- * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
- * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
+ * Alternative constructor that allows setting common Spark properties directly
+ *
+ * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
+ * @param appName A name for your application, to display on the cluster web UI
+ * @param conf a [[org.apache.spark.SparkConf]] object specifying other Spark parameters
+ */
+ def this(master: String, appName: String, conf: SparkConf) =
+ this(conf.setMaster(master).setAppName(appName))
+
+ /**
+ * Alternative constructor that allows setting common Spark properties directly
*
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param appName A name for your application, to display on the cluster web UI.
@@ -82,24 +90,42 @@ class SparkContext(
* system or HDFS, HTTP, HTTPS, or FTP URLs.
* @param environment Environment variables to set on worker nodes.
*/
- def this(master: String, appName: String, sparkHome: String = null,
- jars: Seq[String] = Nil, environment: Map[String, String] = Map(),
- preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
- immutable.Map()) =
- this(new SparkConf(false).setAppName(appName).setMasterUrl(master)
- .setJars(jars).set(environment.toSeq).setSparkHome(sparkHome),
- environment, preferredNodeLocationData)
+ def this(
+ master: String,
+ appName: String,
+ sparkHome: String = null,
+ jars: Seq[String] = Nil,
+ environment: Map[String, String] = Map(),
+ preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) =
+ {
+ this(
+ new SparkConf()
+ .setMaster(master)
+ .setAppName(appName)
+ .setJars(jars)
+ .setExecutorEnv(environment.toSeq)
+ .setSparkHome(sparkHome),
+ preferredNodeLocationData)
+ }
- // Set Spark driver host and port system properties
- Try(conf.get("spark.driver.host"))
- .getOrElse(conf.set("spark.driver.host", Utils.localHostName()))
+ val conf = conf_.clone()
+
+ if (!conf.contains("spark.master")) {
+ throw new SparkException("A master URL must be set in your configuration")
+ }
+ if (!conf.contains("spark.appName")) {
+ throw new SparkException("An application must be set in your configuration")
+ }
- Try(conf.get("spark.driver.port"))
- .getOrElse(conf.set("spark.driver.port", "0"))
+ // Set Spark driver host and port system properties
+ conf.setIfMissing("spark.driver.host", Utils.localHostName())
+ conf.setIfMissing("spark.driver.port", "0")
- val jars: Seq[String] = if (conf.getOrElse("spark.jars", null) != null) {
- conf.get("spark.jars").split(",")
- } else null
+ val jars: Seq[String] = if (conf.contains("spark.jars")) {
+ conf.get("spark.jars").split(",").filter(_.size != 0)
+ } else {
+ null
+ }
val master = conf.get("spark.master")
val appName = conf.get("spark.appName")
@@ -115,8 +141,8 @@ class SparkContext(
conf.get("spark.driver.host"),
conf.get("spark.driver.port").toInt,
conf,
- true,
- isLocal)
+ isDriver = true,
+ isLocal = isLocal)
SparkEnv.set(env)
// Used to store a URL for each static file/jar together with the file's local timestamp
@@ -125,7 +151,8 @@ class SparkContext(
// Keeps track of all persisted RDDs
private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]
- private[spark] val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup)
+ private[spark] val metadataCleaner =
+ new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)
// Initialize the Spark UI
private[spark] val ui = new SparkUI(this)
@@ -135,9 +162,14 @@ class SparkContext(
// Add each JAR given through the constructor
if (jars != null) {
- jars.foreach { addJar(_) }
+ jars.foreach(addJar)
}
+ private[spark] val executorMemory = conf.getOption("spark.executor.memory")
+ .orElse(Option(System.getenv("SPARK_MEM")))
+ .map(Utils.memoryStringToMb)
+ .getOrElse(512)
+
// Environment variables to pass to our executors
private[spark] val executorEnvs = HashMap[String, String]()
// Note: SPARK_MEM is included for Mesos, but overwritten for standalone mode in ExecutorRunner
@@ -148,10 +180,8 @@ class SparkContext(
}
}
// Since memory can be set with a system property too, use that
- executorEnvs("SPARK_MEM") = SparkContext.executorMemoryRequested + "m"
- if (environment != null) {
- executorEnvs ++= environment
- }
+ executorEnvs("SPARK_MEM") = executorMemory + "m"
+ executorEnvs ++= conf.getExecutorEnv
// Set SPARK_USER for user who is running SparkContext.
val sparkUser = Option {
@@ -183,12 +213,12 @@ class SparkContext(
hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
}
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
- Utils.getSystemProperties.foreach { case (key, value) =>
+ conf.getAll.foreach { case (key, value) =>
if (key.startsWith("spark.hadoop.")) {
hadoopConf.set(key.substring("spark.hadoop.".length), value)
}
}
- val bufferSize = conf.getOrElse("spark.buffer.size", "65536")
+ val bufferSize = conf.getOrElse("spark.buffer.size", "65536")
hadoopConf.set("io.file.buffer.size", bufferSize)
hadoopConf
}
@@ -200,7 +230,7 @@ class SparkContext(
override protected def childValue(parent: Properties): Properties = new Properties(parent)
}
- private[spark] def getLocalProperties(): Properties = localProperties.get()
+ private[spark] def getLocalProperties: Properties = localProperties.get()
private[spark] def setLocalProperties(props: Properties) {
localProperties.set(props)
@@ -533,7 +563,7 @@ class SparkContext(
// Fetch the file locally in case a job is executed locally.
// Jobs that run through LocalScheduler will already fetch the required dependencies,
// but jobs run in DAGScheduler.runLocally() will not so we must fetch the files here.
- Utils.fetchFile(path, new File(SparkFiles.getRootDirectory))
+ Utils.fetchFile(path, new File(SparkFiles.getRootDirectory), conf)
logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
}
@@ -915,14 +945,6 @@ object SparkContext {
private[spark] val SPARK_UNKNOWN_USER = "<unknown>"
- private lazy val conf = new SparkConf()
-
- private[spark] def globalConf = {
- if (SparkEnv.get != null) {
- SparkEnv.get.conf
- } else conf
- }
-
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
def zero(initialValue: Double) = 0.0
@@ -1031,18 +1053,10 @@ object SparkContext {
/** Find the JAR that contains the class of a particular object */
def jarOfObject(obj: AnyRef): Seq[String] = jarOfClass(obj.getClass)
- /** Get the amount of memory per executor requested through system properties or SPARK_MEM */
- private[spark] val executorMemoryRequested = {
- // TODO: Might need to add some extra memory for the non-heap parts of the JVM
- Try(globalConf.get("spark.executor.memory")).toOption
- .orElse(Option(System.getenv("SPARK_MEM")))
- .map(Utils.memoryStringToMb)
- .getOrElse(512)
- }
-
// Creates a task scheduler based on a given master URL. Extracted for testing.
- private
- def createTaskScheduler(sc: SparkContext, master: String, appName: String): TaskScheduler = {
+ private def createTaskScheduler(sc: SparkContext, master: String, appName: String)
+ : TaskScheduler =
+ {
// Regular expression used for local[N] master format
val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r
// Regular expression for local[N, maxRetries], used in tests with failing tasks
@@ -1076,10 +1090,10 @@ object SparkContext {
case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
// Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
val memoryPerSlaveInt = memoryPerSlave.toInt
- if (SparkContext.executorMemoryRequested > memoryPerSlaveInt) {
+ if (sc.executorMemory > memoryPerSlaveInt) {
throw new SparkException(
"Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
- memoryPerSlaveInt, SparkContext.executorMemoryRequested))
+ memoryPerSlaveInt, sc.executorMemory))
}
val scheduler = new ClusterScheduler(sc)
@@ -1137,7 +1151,7 @@ object SparkContext {
case mesosUrl @ MESOS_REGEX(_) =>
MesosNativeLibrary.load()
val scheduler = new ClusterScheduler(sc)
- val coarseGrained = globalConf.getOrElse("spark.mesos.coarse", "false").toBoolean
+ val coarseGrained = sc.conf.getOrElse("spark.mesos.coarse", "false").toBoolean
val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
val backend = if (coarseGrained) {
new CoarseMesosSchedulerBackend(scheduler, sc, url, appName)
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 78e4ae27b2..34fad3e763 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -107,7 +107,7 @@ object SparkEnv extends Logging {
/**
* Returns the ThreadLocal SparkEnv.
*/
- def getThreadLocal : SparkEnv = {
+ def getThreadLocal: SparkEnv = {
env.get()
}
@@ -150,18 +150,19 @@ object SparkEnv extends Logging {
val serializerManager = new SerializerManager
val serializer = serializerManager.setDefault(
- conf.getOrElse("spark.serializer", "org.apache.spark.serializer.JavaSerializer"))
+ conf.getOrElse("spark.serializer", "org.apache.spark.serializer.JavaSerializer"), conf)
val closureSerializer = serializerManager.get(
- conf.getOrElse("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"))
+ conf.getOrElse("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"),
+ conf)
def registerOrLookup(name: String, newActor: => Actor): Either[ActorRef, ActorSelection] = {
if (isDriver) {
logInfo("Registering " + name)
Left(actorSystem.actorOf(Props(newActor), name = name))
} else {
- val driverHost: String = conf.getOrElse("spark.driver.host", "localhost")
- val driverPort: Int = conf.getOrElse("spark.driver.port", "7077").toInt
+ val driverHost: String = conf.getOrElse("spark.driver.host", "localhost")
+ val driverPort: Int = conf.getOrElse("spark.driver.port", "7077").toInt
Utils.checkHost(driverHost, "Expected hostname")
val url = "akka.tcp://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
logInfo("Connecting to " + name + ": " + url)
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index acf328aa6a..e03cf9d13a 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -29,17 +29,22 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import com.google.common.base.Optional
-import org.apache.spark.{Accumulable, AccumulableParam, Accumulator, AccumulatorParam, SparkContext}
+import org.apache.spark._
import org.apache.spark.SparkContext.IntAccumulatorParam
import org.apache.spark.SparkContext.DoubleAccumulatorParam
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
+import scala.Tuple2
/**
* A Java-friendly version of [[org.apache.spark.SparkContext]] that returns [[org.apache.spark.api.java.JavaRDD]]s and
* works with Java collections instead of Scala ones.
*/
class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWorkaround {
+ /**
+ * @param conf a [[org.apache.spark.SparkConf]] object specifying Spark parameters
+ */
+ def this(conf: SparkConf) = this(new SparkContext(conf))
/**
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
@@ -50,6 +55,14 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
/**
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param appName A name for your application, to display on the cluster web UI
+ * @param conf a [[org.apache.spark.SparkConf]] object specifying other Spark parameters
+ */
+ def this(master: String, appName: String, conf: SparkConf) =
+ this(conf.setMaster(master).setAppName(appName))
+
+ /**
+ * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
+ * @param appName A name for your application, to display on the cluster web UI
* @param sparkHome The SPARK_HOME directory on the slave nodes
* @param jarFile JAR file to send to the cluster. This can be a path on the local file system
* or an HDFS, HTTP, HTTPS, or FTP URL.
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index d6eacfe23e..05fd824254 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -41,7 +41,7 @@ private[spark] class PythonRDD[T: ClassTag](
accumulator: Accumulator[JList[Array[Byte]]])
extends RDD[Array[Byte]](parent) {
- val bufferSize = conf.getOrElse("spark.buffer.size", "65536").toInt
+ val bufferSize = conf.getOrElse("spark.buffer.size", "65536").toInt
override def getPartitions = parent.partitions
@@ -247,10 +247,10 @@ private class BytesToString extends org.apache.spark.api.java.function.Function[
*/
private class PythonAccumulatorParam(@transient serverHost: String, serverPort: Int)
extends AccumulatorParam[JList[Array[Byte]]] {
- import SparkContext.{globalConf => conf}
+
Utils.checkHost(serverHost, "Expected hostname")
- val bufferSize = conf.getOrElse("spark.buffer.size", "65536").toInt
+ val bufferSize = SparkEnv.get.conf.getOrElse("spark.buffer.size", "65536").toInt
override def zero(value: JList[Array[Byte]]): JList[Array[Byte]] = new JArrayList
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index cecb8c228b..47528bcee8 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -31,7 +31,7 @@ import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedH
private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
extends Broadcast[T](id) with Logging with Serializable {
-
+
def value = value_
def blockId = BroadcastBlockId(id)
@@ -40,7 +40,7 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea
SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
}
- if (!isLocal) {
+ if (!isLocal) {
HttpBroadcast.write(id, value_)
}
@@ -81,41 +81,48 @@ private object HttpBroadcast extends Logging {
private var serverUri: String = null
private var server: HttpServer = null
+ // TODO: This shouldn't be a global variable so that multiple SparkContexts can coexist
private val files = new TimeStampedHashSet[String]
- private val cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup)
+ private var cleaner: MetadataCleaner = null
- private val httpReadTimeout = TimeUnit.MILLISECONDS.convert(5,TimeUnit.MINUTES).toInt
+ private val httpReadTimeout = TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES).toInt
- private lazy val compressionCodec = CompressionCodec.createCodec()
+ private var compressionCodec: CompressionCodec = null
def initialize(isDriver: Boolean, conf: SparkConf) {
synchronized {
if (!initialized) {
- bufferSize = conf.getOrElse("spark.buffer.size", "65536").toInt
- compress = conf.getOrElse("spark.broadcast.compress", "true").toBoolean
+ bufferSize = conf.getOrElse("spark.buffer.size", "65536").toInt
+ compress = conf.getOrElse("spark.broadcast.compress", "true").toBoolean
if (isDriver) {
- createServer()
+ createServer(conf)
conf.set("spark.httpBroadcast.uri", serverUri)
}
serverUri = conf.get("spark.httpBroadcast.uri")
+ cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup, conf)
+ compressionCodec = CompressionCodec.createCodec(conf)
initialized = true
}
}
}
-
+
def stop() {
synchronized {
if (server != null) {
server.stop()
server = null
}
+ if (cleaner != null) {
+ cleaner.cancel()
+ cleaner = null
+ }
+ compressionCodec = null
initialized = false
- cleaner.cancel()
}
}
- private def createServer() {
- broadcastDir = Utils.createTempDir(Utils.getLocalDir)
+ private def createServer(conf: SparkConf) {
+ broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf))
server = new HttpServer(broadcastDir)
server.start()
serverUri = server.uri
@@ -143,7 +150,7 @@ private object HttpBroadcast extends Logging {
val in = {
val httpConnection = new URL(url).openConnection()
httpConnection.setReadTimeout(httpReadTimeout)
- val inputStream = httpConnection.getInputStream()
+ val inputStream = httpConnection.getInputStream
if (compress) {
compressionCodec.compressedInputStream(inputStream)
} else {
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 4a3801dc48..00ec3b971b 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -83,13 +83,13 @@ extends Broadcast[T](id) with Logging with Serializable {
case None =>
val start = System.nanoTime
logInfo("Started reading broadcast variable " + id)
-
+
// Initialize @transient variables that will receive garbage values from the master.
resetWorkerVariables()
if (receiveBroadcast(id)) {
value_ = TorrentBroadcast.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks)
-
+
// Store the merged copy in cache so that the next worker doesn't need to rebuild it.
// This creates a tradeoff between memory usage and latency.
// Storing copy doubles the memory footprint; not storing doubles deserialization cost.
@@ -122,14 +122,14 @@ extends Broadcast[T](id) with Logging with Serializable {
while (attemptId > 0 && totalBlocks == -1) {
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.getSingle(metaId) match {
- case Some(x) =>
+ case Some(x) =>
val tInfo = x.asInstanceOf[TorrentInfo]
totalBlocks = tInfo.totalBlocks
totalBytes = tInfo.totalBytes
arrayOfBlocks = new Array[TorrentBlock](totalBlocks)
hasBlocks = 0
-
- case None =>
+
+ case None =>
Thread.sleep(500)
}
}
@@ -145,13 +145,13 @@ extends Broadcast[T](id) with Logging with Serializable {
val pieceId = BroadcastHelperBlockId(broadcastId, "piece" + pid)
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.getSingle(pieceId) match {
- case Some(x) =>
+ case Some(x) =>
arrayOfBlocks(pid) = x.asInstanceOf[TorrentBlock]
hasBlocks += 1
SparkEnv.get.blockManager.putSingle(
pieceId, arrayOfBlocks(pid), StorageLevel.MEMORY_AND_DISK, true)
-
- case None =>
+
+ case None =>
throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
}
}
@@ -175,13 +175,13 @@ extends Logging {
}
}
}
-
+
def stop() {
initialized = false
}
- lazy val BLOCK_SIZE = conf.getOrElse("spark.broadcast.blockSize", "4096").toInt * 1024
-
+ lazy val BLOCK_SIZE = conf.getOrElse("spark.broadcast.blockSize", "4096").toInt * 1024
+
def blockifyObject[T](obj: T): TorrentInfo = {
val byteArray = Utils.serialize[T](obj)
val bais = new ByteArrayInputStream(byteArray)
@@ -210,7 +210,7 @@ extends Logging {
}
def unBlockifyObject[T](arrayOfBlocks: Array[TorrentBlock],
- totalBytes: Int,
+ totalBytes: Int,
totalBlocks: Int): T = {
var retByteArray = new Array[Byte](totalBytes)
for (i <- 0 until totalBlocks) {
@@ -223,22 +223,22 @@ extends Logging {
}
private[spark] case class TorrentBlock(
- blockID: Int,
- byteArray: Array[Byte])
+ blockID: Int,
+ byteArray: Array[Byte])
extends Serializable
private[spark] case class TorrentInfo(
@transient arrayOfBlocks : Array[TorrentBlock],
- totalBlocks: Int,
- totalBytes: Int)
+ totalBlocks: Int,
+ totalBytes: Int)
extends Serializable {
-
- @transient var hasBlocks = 0
+
+ @transient var hasBlocks = 0
}
private[spark] class TorrentBroadcastFactory
extends BroadcastFactory {
-
+
def initialize(isDriver: Boolean, conf: SparkConf) { TorrentBroadcast.initialize(isDriver, conf) }
def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long) =
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index dda43dc018..19d393a0db 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -26,7 +26,7 @@ private[spark] class ApplicationDescription(
val appUiUrl: String)
extends Serializable {
- val user = System.getProperty("user.name", "<unknown>")
+ val user = System.getProperty("user.name", "<unknown>")
override def toString: String = "ApplicationDescription(" + name + ")"
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index 59d12a3e6f..ffc0cb0903 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -22,7 +22,7 @@ import akka.actor.ActorSystem
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.deploy.master.Master
import org.apache.spark.util.Utils
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
import scala.collection.mutable.ArrayBuffer
@@ -43,7 +43,8 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")
/* Start the Master */
- val (masterSystem, masterPort, _) = Master.startSystemAndActor(localHostname, 0, 0)
+ val conf = new SparkConf(false)
+ val (masterSystem, masterPort, _) = Master.startSystemAndActor(localHostname, 0, 0, conf)
masterActorSystems += masterSystem
val masterUrl = "spark://" + localHostname + ":" + masterPort
val masters = Array(masterUrl)
@@ -55,7 +56,7 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
workerActorSystems += workerSystem
}
- return masters
+ masters
}
def stop() {
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 1c979ac3e0..4f402c1121 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -34,10 +34,10 @@ class SparkHadoopUtil {
UserGroupInformation.setConfiguration(conf)
def runAsUser(user: String)(func: () => Unit) {
- // if we are already running as the user intended there is no reason to do the doAs. It
+ // if we are already running as the user intended there is no reason to do the doAs. It
// will actually break secure HDFS access as it doesn't fill in the credentials. Also if
- // the user is UNKNOWN then we shouldn't be creating a remote unknown user
- // (this is actually the path spark on yarn takes) since SPARK_USER is initialized only
+ // the user is UNKNOWN then we shouldn't be creating a remote unknown user
+ // (this is actually the path spark on yarn takes) since SPARK_USER is initialized only
// in SparkContext.
val currentUser = Option(System.getProperty("user.name")).
getOrElse(SparkContext.SPARK_UNKNOWN_USER)
@@ -67,12 +67,14 @@ class SparkHadoopUtil {
}
object SparkHadoopUtil {
- import SparkContext.{globalConf => conf}
+
private val hadoop = {
- val yarnMode = java.lang.Boolean.valueOf(conf.getOrElse("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
+ val yarnMode = java.lang.Boolean.valueOf(System.getenv("SPARK_YARN_MODE"))
if (yarnMode) {
try {
- Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
+ Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil")
+ .newInstance()
+ .asInstanceOf[SparkHadoopUtil]
} catch {
case th: Throwable => throw new SparkException("Unable to load YARN support", th)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
index 426cf524ae..ef649fd80c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
@@ -18,7 +18,7 @@
package org.apache.spark.deploy.client
import org.apache.spark.util.{Utils, AkkaUtils}
-import org.apache.spark.{SparkContext, Logging}
+import org.apache.spark.{SparkConf, SparkContext, Logging}
import org.apache.spark.deploy.{Command, ApplicationDescription}
private[spark] object TestClient {
@@ -46,11 +46,12 @@ private[spark] object TestClient {
def main(args: Array[String]) {
val url = args(0)
val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
- conf = SparkContext.globalConf)
+ conf = new SparkConf)
val desc = new ApplicationDescription(
- "TestClient", 1, 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()), "dummy-spark-home", "ignored")
+ "TestClient", 1, 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()),
+ "dummy-spark-home", "ignored")
val listener = new TestListener
- val client = new Client(actorSystem, Array(url), desc, listener, SparkContext.globalConf)
+ val client = new Client(actorSystem, Array(url), desc, listener, new SparkConf)
client.start()
actorSystem.awaitTermination()
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 2c162c4fa2..9c89e36b14 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -29,7 +29,7 @@ import akka.pattern.ask
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import akka.serialization.SerializationExtension
-import org.apache.spark.{SparkContext, Logging, SparkException}
+import org.apache.spark.{SparkConf, SparkContext, Logging, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.MasterMessages._
@@ -38,14 +38,16 @@ import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{AkkaUtils, Utils}
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
- import context.dispatcher
- val conf = SparkContext.globalConf
+ import context.dispatcher // to use Akka's scheduler.schedule()
+
+ val conf = new SparkConf
+
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
- val WORKER_TIMEOUT = conf.getOrElse("spark.worker.timeout", "60").toLong * 1000
- val RETAINED_APPLICATIONS = conf.getOrElse("spark.deploy.retainedApplications", "200").toInt
- val REAPER_ITERATIONS = conf.getOrElse("spark.dead.worker.persistence", "15").toInt
- val RECOVERY_DIR = conf.getOrElse("spark.deploy.recoveryDirectory", "")
- val RECOVERY_MODE = conf.getOrElse("spark.deploy.recoveryMode", "NONE")
+ val WORKER_TIMEOUT = conf.getOrElse("spark.worker.timeout", "60").toLong * 1000
+ val RETAINED_APPLICATIONS = conf.getOrElse("spark.deploy.retainedApplications", "200").toInt
+ val REAPER_ITERATIONS = conf.getOrElse("spark.dead.worker.persistence", "15").toInt
+ val RECOVERY_DIR = conf.getOrElse("spark.deploy.recoveryDirectory", "")
+ val RECOVERY_MODE = conf.getOrElse("spark.deploy.recoveryMode", "NONE")
var nextAppNumber = 0
val workers = new HashSet[WorkerInfo]
@@ -86,7 +88,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
// As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each app
// among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
- val spreadOutApps = conf.getOrElse("spark.deploy.spreadOut", "true").toBoolean
+ val spreadOutApps = conf.getOrElse("spark.deploy.spreadOut", "true").toBoolean
override def preStart() {
logInfo("Starting Spark master at " + masterUrl)
@@ -495,7 +497,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
removeWorker(worker)
} else {
if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT))
- workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
+ workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
}
}
}
@@ -507,8 +509,9 @@ private[spark] object Master {
val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
def main(argStrings: Array[String]) {
- val args = new MasterArguments(argStrings, SparkContext.globalConf)
- val (actorSystem, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort)
+ val conf = new SparkConf
+ val args = new MasterArguments(argStrings, conf)
+ val (actorSystem, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)
actorSystem.awaitTermination()
}
@@ -522,11 +525,12 @@ private[spark] object Master {
}
}
- def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = {
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,
- conf = SparkContext.globalConf)
+ def startSystemAndActor(host: String, port: Int, webUiPort: Int, conf: SparkConf)
+ : (ActorSystem, Int, Int) =
+ {
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf)
val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), actorName)
- val timeout = AkkaUtils.askTimeout(SparkContext.globalConf)
+ val timeout = AkkaUtils.askTimeout(conf)
val respFuture = actor.ask(RequestWebUIPort)(timeout)
val resp = Await.result(respFuture, timeout).asInstanceOf[WebUIPortResponse]
(actorSystem, boundPort, resp.webUIBoundPort)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
index 79d95b1a83..60c7a7c2d6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
@@ -37,7 +37,7 @@ import org.apache.spark.{SparkConf, Logging}
*/
private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher,
conf: SparkConf) extends Logging {
- val ZK_URL = conf.getOrElse("spark.deploy.zookeeper.url", "")
+ val ZK_URL = conf.getOrElse("spark.deploy.zookeeper.url", "")
val ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE
val ZK_TIMEOUT_MILLIS = 30000
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
index df5bb368a2..a61597bbdf 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
@@ -28,7 +28,7 @@ private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef,
masterUrl: String, conf: SparkConf)
extends LeaderElectionAgent with SparkZooKeeperWatcher with Logging {
- val WORKING_DIR = conf.getOrElse("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
+ val WORKING_DIR = conf.getOrElse("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
private val watcher = new ZooKeeperWatcher()
private val zk = new SparkZooKeeperSession(this, conf)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
index c55b720422..245a558a59 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -27,7 +27,7 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
with SparkZooKeeperWatcher
with Logging
{
- val WORKING_DIR = conf.getOrElse("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
+ val WORKING_DIR = conf.getOrElse("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
val zk = new SparkZooKeeperSession(this, conf)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 75a6e75c78..f844fcbbfc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -55,7 +55,7 @@ private[spark] class Worker(
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs
// Send a heartbeat every (heartbeat timeout) / 4 milliseconds
- val HEARTBEAT_MILLIS = conf.getOrElse("spark.worker.timeout", "60").toLong * 1000 / 4
+ val HEARTBEAT_MILLIS = conf.getOrElse("spark.worker.timeout", "60").toLong * 1000 / 4
val REGISTRATION_TIMEOUT = 20.seconds
val REGISTRATION_RETRIES = 3
@@ -267,7 +267,7 @@ private[spark] class Worker(
}
private[spark] object Worker {
- import org.apache.spark.SparkContext.globalConf
+
def main(argStrings: Array[String]) {
val args = new WorkerArguments(argStrings)
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
@@ -276,14 +276,16 @@ private[spark] object Worker {
}
def startSystemAndActor(host: String, port: Int, webUiPort: Int, cores: Int, memory: Int,
- masterUrls: Array[String], workDir: String, workerNumber: Option[Int] = None)
- : (ActorSystem, Int) = {
+ masterUrls: Array[String], workDir: String, workerNumber: Option[Int] = None)
+ : (ActorSystem, Int) =
+ {
// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
+ val conf = new SparkConf
val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,
- conf = globalConf)
+ conf = conf)
actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
- masterUrls, workDir, globalConf), name = "Worker")
+ masterUrls, workDir, conf), name = "Worker")
(actorSystem, boundPort)
}
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index c8319f6f6e..53a2b94a52 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
import akka.actor._
import akka.remote._
-import org.apache.spark.{SparkContext, Logging}
+import org.apache.spark.{SparkConf, SparkContext, Logging}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{Utils, AkkaUtils}
@@ -98,7 +98,7 @@ private[spark] object CoarseGrainedExecutorBackend {
// Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
// before getting started with all our system properties, etc
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
- indestructible = true, conf = SparkContext.globalConf)
+ indestructible = true, conf = new SparkConf)
// set it
val sparkHostPort = hostname + ":" + boundPort
// conf.set("spark.hostPort", sparkHostPort)
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 70fc30e993..a6eabc462b 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -57,17 +57,18 @@ private[spark] class Executor(
// Make sure the local hostname we report matches the cluster scheduler's name for this host
Utils.setCustomHostname(slaveHostname)
+
+ // Set spark.* properties from executor arg
val conf = new SparkConf(false)
- // Set spark.* system properties from executor arg
- for ((key, value) <- properties) {
- conf.set(key, value)
- }
+ conf.setAll(properties)
// If we are in yarn mode, systems can have different disk layouts so we must set it
// to what Yarn on this system said was available. This will be used later when SparkEnv
// created.
- if (java.lang.Boolean.valueOf(System.getenv("SPARK_YARN_MODE"))) {
- conf.set("spark.local.dir", getYarnLocalDirs())
+ if (java.lang.Boolean.valueOf(
+ System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))))
+ {
+ conf.set("spark.local.dir", getYarnLocalDirs())
}
// Create our ClassLoader and set it on this thread
@@ -331,12 +332,12 @@ private[spark] class Executor(
// Fetch missing dependencies
for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
- Utils.fetchFile(name, new File(SparkFiles.getRootDirectory))
+ Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf)
currentFiles(name) = timestamp
}
for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
- Utils.fetchFile(name, new File(SparkFiles.getRootDirectory))
+ Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf)
currentJars(name) = timestamp
// Add it to our class loader
val localName = name.split("/").last
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 8ef5019b6c..20402686a8 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -22,7 +22,7 @@ import java.io.{InputStream, OutputStream}
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkEnv, SparkConf}
/**
@@ -38,16 +38,15 @@ trait CompressionCodec {
private[spark] object CompressionCodec {
- import org.apache.spark.SparkContext.globalConf
- def createCodec(): CompressionCodec = {
- createCodec(System.getProperty(
+ def createCodec(conf: SparkConf): CompressionCodec = {
+ createCodec(conf, conf.getOrElse(
"spark.io.compression.codec", classOf[LZFCompressionCodec].getName))
}
- def createCodec(codecName: String): CompressionCodec = {
+ def createCodec(conf: SparkConf, codecName: String): CompressionCodec = {
val ctor = Class.forName(codecName, true, Thread.currentThread.getContextClassLoader)
.getConstructor(classOf[SparkConf])
- ctor.newInstance(globalConf).asInstanceOf[CompressionCodec]
+ ctor.newInstance(conf).asInstanceOf[CompressionCodec]
}
}
@@ -72,7 +71,7 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec {
class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
override def compressedOutputStream(s: OutputStream): OutputStream = {
- val blockSize = conf.getOrElse("spark.io.compression.snappy.block.size", "32768").toInt
+ val blockSize = conf.getOrElse("spark.io.compression.snappy.block.size", "32768").toInt
new SnappyOutputStream(s, blockSize)
}
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index 3e902f8ac5..697096fa76 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -593,10 +593,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
private[spark] object ConnectionManager {
- import SparkContext.globalConf
-
def main(args: Array[String]) {
- val manager = new ConnectionManager(9999, globalConf)
+ val manager = new ConnectionManager(9999, new SparkConf)
manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
println("Received [" + msg + "] from [" + id + "]")
None
diff --git a/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala b/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
index 4ca3cd390b..1c9d6030d6 100644
--- a/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
@@ -19,19 +19,19 @@ package org.apache.spark.network
import java.nio.ByteBuffer
import java.net.InetAddress
+import org.apache.spark.SparkConf
private[spark] object ReceiverTest {
- import org.apache.spark.SparkContext.globalConf
def main(args: Array[String]) {
- val manager = new ConnectionManager(9999, globalConf)
+ val manager = new ConnectionManager(9999, new SparkConf)
println("Started connection manager with id = " + manager.id)
-
- manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
+
+ manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
/*println("Received [" + msg + "] from [" + id + "] at " + System.currentTimeMillis)*/
- val buffer = ByteBuffer.wrap("response".getBytes())
+ val buffer = ByteBuffer.wrap("response".getBytes)
Some(Message.createBufferMessage(buffer, msg.id))
})
- Thread.currentThread.join()
+ Thread.currentThread.join()
}
}
diff --git a/core/src/main/scala/org/apache/spark/network/SenderTest.scala b/core/src/main/scala/org/apache/spark/network/SenderTest.scala
index 11c21fc1d5..dcbd183c88 100644
--- a/core/src/main/scala/org/apache/spark/network/SenderTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/SenderTest.scala
@@ -19,29 +19,29 @@ package org.apache.spark.network
import java.nio.ByteBuffer
import java.net.InetAddress
+import org.apache.spark.SparkConf
private[spark] object SenderTest {
- import org.apache.spark.SparkContext.globalConf
def main(args: Array[String]) {
-
+
if (args.length < 2) {
println("Usage: SenderTest <target host> <target port>")
System.exit(1)
}
-
+
val targetHost = args(0)
val targetPort = args(1).toInt
val targetConnectionManagerId = new ConnectionManagerId(targetHost, targetPort)
- val manager = new ConnectionManager(0, globalConf)
+ val manager = new ConnectionManager(0, new SparkConf)
println("Started connection manager with id = " + manager.id)
- manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
+ manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
println("Received [" + msg + "] from [" + id + "]")
None
})
-
- val size = 100 * 1024 * 1024
+
+ val size = 100 * 1024 * 1024
val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
buffer.flip
@@ -50,7 +50,7 @@ private[spark] object SenderTest {
val count = 100
(0 until count).foreach(i => {
val dataMessage = Message.createBufferMessage(buffer.duplicate)
- val startTime = System.currentTimeMillis
+ val startTime = System.currentTimeMillis
/*println("Started timer at " + startTime)*/
val responseStr = manager.sendMessageReliablySync(targetConnectionManagerId, dataMessage) match {
case Some(response) =>
diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
index 81b3104afd..db28ddf9ac 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
@@ -36,7 +36,7 @@ private[spark] class ShuffleCopier(conf: SparkConf) extends Logging {
resultCollectCallback: (BlockId, Long, ByteBuf) => Unit) {
val handler = new ShuffleCopier.ShuffleClientHandler(resultCollectCallback)
- val connectTimeout = conf.getOrElse("spark.shuffle.netty.connect.timeout", "60000").toInt
+ val connectTimeout = conf.getOrElse("spark.shuffle.netty.connect.timeout", "60000").toInt
val fc = new FileClient(handler, connectTimeout)
try {
@@ -104,10 +104,10 @@ private[spark] object ShuffleCopier extends Logging {
val threads = if (args.length > 3) args(3).toInt else 10
val copiers = Executors.newFixedThreadPool(80)
- val tasks = (for (i <- Range(0, threads)) yield {
+ val tasks = (for (i <- Range(0, threads)) yield {
Executors.callable(new Runnable() {
def run() {
- val copier = new ShuffleCopier(SparkContext.globalConf)
+ val copier = new ShuffleCopier(new SparkConf)
copier.getBlock(host, port, blockId, echoResultCollectCallBack)
}
})
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index 9fbe002748..2897c4b841 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -74,9 +74,6 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
}
private[spark] object CheckpointRDD extends Logging {
-
- import SparkContext.{globalConf => conf}
-
def splitIdToFile(splitId: Int): String = {
"part-%05d".format(splitId)
}
@@ -94,7 +91,7 @@ private[spark] object CheckpointRDD extends Logging {
throw new IOException("Checkpoint failed: temporary path " +
tempOutputPath + " already exists")
}
- val bufferSize = conf.getOrElse("spark.buffer.size", "65536").toInt
+ val bufferSize = env.conf.getOrElse("spark.buffer.size", "65536").toInt
val fileOutputStream = if (blockSize < 0) {
fs.create(tempOutputPath, false, bufferSize)
@@ -124,7 +121,7 @@ private[spark] object CheckpointRDD extends Logging {
def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = {
val env = SparkEnv.get
val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
- val bufferSize = conf.getOrElse("spark.buffer.size", "65536").toInt
+ val bufferSize = env.conf.getOrElse("spark.buffer.size", "65536").toInt
val fileInputStream = fs.open(path, bufferSize)
val serializer = env.serializer.newInstance()
val deserializeStream = serializer.deserializeStream(fileInputStream)
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 911a002884..4ba4696fef 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -114,7 +114,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
map.changeValue(k, update)
}
- val ser = SparkEnv.get.serializerManager.get(serializerClass)
+ val ser = SparkEnv.get.serializerManager.get(serializerClass, SparkEnv.get.conf)
for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
// Read them from the parent
diff --git a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
index 3682c84598..0ccb309d0d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
@@ -59,7 +59,7 @@ class ShuffledRDD[K, V, P <: Product2[K, V] : ClassTag](
override def compute(split: Partition, context: TaskContext): Iterator[P] = {
val shuffledId = dependencies.head.asInstanceOf[ShuffleDependency[K, V]].shuffleId
SparkEnv.get.shuffleFetcher.fetch[P](shuffledId, split.index, context,
- SparkEnv.get.serializerManager.get(serializerClass))
+ SparkEnv.get.serializerManager.get(serializerClass, SparkEnv.get.conf))
}
override def clearDependencies() {
diff --git a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
index aab30b1bb4..4f90c7d3d6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
@@ -93,7 +93,7 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](
override def compute(p: Partition, context: TaskContext): Iterator[(K, V)] = {
val partition = p.asInstanceOf[CoGroupPartition]
- val serializer = SparkEnv.get.serializerManager.get(serializerClass)
+ val serializer = SparkEnv.get.serializerManager.get(serializerClass, SparkEnv.get.conf)
val map = new JHashMap[K, ArrayBuffer[V]]
def getSeq(k: K): ArrayBuffer[V] = {
val seq = map.get(k)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 963d15b76d..77aa24e6b6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -158,7 +158,8 @@ class DAGScheduler(
val activeJobs = new HashSet[ActiveJob]
val resultStageToJob = new HashMap[Stage, ActiveJob]
- val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, this.cleanup)
+ val metadataCleaner = new MetadataCleaner(
+ MetadataCleanerType.DAG_SCHEDULER, this.cleanup, env.conf)
/**
* Starts the event processing actor. The actor has two responsibilities:
@@ -529,7 +530,7 @@ class DAGScheduler(
case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
var finalStage: Stage = null
try {
- // New stage creation at times and if its not protected, the scheduler thread is killed.
+ // New stage creation at times and if its not protected, the scheduler thread is killed.
// e.g. it can fail when jobs are run on HadoopRDD whose underlying hdfs files have been deleted
finalStage = newStage(rdd, partitions.size, None, jobId, Some(callSite))
} catch {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
index 1791ee660d..90eb8a747f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
@@ -32,7 +32,7 @@ import scala.collection.JavaConversions._
/**
* Parses and holds information about inputFormat (and files) specified as a parameter.
*/
-class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Class[_],
+class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Class[_],
val path: String) extends Logging {
var mapreduceInputFormat: Boolean = false
@@ -40,7 +40,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
validate()
- override def toString(): String = {
+ override def toString: String = {
"InputFormatInfo " + super.toString + " .. inputFormatClazz " + inputFormatClazz + ", path : " + path
}
@@ -125,7 +125,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
}
private def findPreferredLocations(): Set[SplitInfo] = {
- logDebug("mapreduceInputFormat : " + mapreduceInputFormat + ", mapredInputFormat : " + mapredInputFormat +
+ logDebug("mapreduceInputFormat : " + mapreduceInputFormat + ", mapredInputFormat : " + mapredInputFormat +
", inputFormatClazz : " + inputFormatClazz)
if (mapreduceInputFormat) {
return prefLocsFromMapreduceInputFormat()
@@ -143,14 +143,14 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
object InputFormatInfo {
/**
Computes the preferred locations based on input(s) and returned a location to block map.
- Typical use of this method for allocation would follow some algo like this
- (which is what we currently do in YARN branch) :
+ Typical use of this method for allocation would follow some algo like this:
+
a) For each host, count number of splits hosted on that host.
b) Decrement the currently allocated containers on that host.
c) Compute rack info for each host and update rack -> count map based on (b).
d) Allocate nodes based on (c)
- e) On the allocation result, ensure that we dont allocate "too many" jobs on a single node
- (even if data locality on that is very high) : this is to prevent fragility of job if a single
+ e) On the allocation result, ensure that we dont allocate "too many" jobs on a single node
+ (even if data locality on that is very high) : this is to prevent fragility of job if a single
(or small set of) hosts go down.
go to (a) until required nodes are allocated.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 3f55cd5642..60927831a1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -41,7 +41,7 @@ import org.apache.spark.storage.StorageLevel
class JobLogger(val user: String, val logDirName: String)
extends SparkListener with Logging {
- def this() = this(System.getProperty("user.name", "<unknown>"),
+ def this() = this(System.getProperty("user.name", "<unknown>"),
String.valueOf(System.currentTimeMillis()))
private val logDir =
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index 310ec62ca8..28f3ba53b8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -32,7 +32,9 @@ private[spark] object ResultTask {
// expensive on the master node if it needs to launch thousands of tasks.
val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]]
- val metadataCleaner = new MetadataCleaner(MetadataCleanerType.RESULT_TASK, serializedInfoCache.clearOldValues)
+ // TODO: This object shouldn't have global variables
+ val metadataCleaner = new MetadataCleaner(
+ MetadataCleanerType.RESULT_TASK, serializedInfoCache.clearOldValues, new SparkConf)
def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) => _): Array[Byte] = {
synchronized {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
index 9002d33cda..3cf995ea74 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
@@ -52,7 +52,7 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
extends SchedulableBuilder with Logging {
- val schedulerAllocFile = Option(conf.get("spark.scheduler.allocation.file"))
+ val schedulerAllocFile = conf.getOption("spark.scheduler.allocation.file")
val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool"
val DEFAULT_POOL_NAME = "default"
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 0f2deb4bcb..a37ead5632 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -37,7 +37,9 @@ private[spark] object ShuffleMapTask {
// expensive on the master node if it needs to launch thousands of tasks.
val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]]
- val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SHUFFLE_MAP_TASK, serializedInfoCache.clearOldValues)
+ // TODO: This object shouldn't have global variables
+ val metadataCleaner = new MetadataCleaner(
+ MetadataCleanerType.SHUFFLE_MAP_TASK, serializedInfoCache.clearOldValues, new SparkConf)
def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_]): Array[Byte] = {
synchronized {
@@ -152,7 +154,7 @@ private[spark] class ShuffleMapTask(
try {
// Obtain all the block writers for shuffle blocks.
- val ser = SparkEnv.get.serializerManager.get(dep.serializerClass)
+ val ser = SparkEnv.get.serializerManager.get(dep.serializerClass, SparkEnv.get.conf)
shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)
// Write the map output to its associated buckets.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
index 7e231ec44c..2707740d44 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@ -51,10 +51,10 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
{
val conf = sc.conf
// How often to check for speculative tasks
- val SPECULATION_INTERVAL = conf.getOrElse("spark.speculation.interval", "100").toLong
+ val SPECULATION_INTERVAL = conf.getOrElse("spark.speculation.interval", "100").toLong
// Threshold above which we warn user initial TaskSet may be starved
- val STARVATION_TIMEOUT = conf.getOrElse("spark.starvation.timeout", "15000").toLong
+ val STARVATION_TIMEOUT = conf.getOrElse("spark.starvation.timeout", "15000").toLong
// ClusterTaskSetManagers are not thread safe, so any access to one should be synchronized
// on this class.
@@ -91,7 +91,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
var rootPool: Pool = null
// default scheduler is FIFO
val schedulingMode: SchedulingMode = SchedulingMode.withName(
- conf.getOrElse("spark.scheduler.mode", "FIFO"))
+ conf.getOrElse("spark.scheduler.mode", "FIFO"))
// This is a var so that we can reset it for testing purposes.
private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)
@@ -120,7 +120,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
override def start() {
backend.start()
- if (conf.getOrElse("spark.speculation", "false").toBoolean) {
+ if (conf.getOrElse("spark.speculation", "false").toBoolean) {
logInfo("Starting speculative execution thread")
import sc.env.actorSystem.dispatcher
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
index 398b0cefbf..a46b16b92f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -52,14 +52,14 @@ private[spark] class ClusterTaskSetManager(
{
val conf = sched.sc.conf
// CPUs to request per task
- val CPUS_PER_TASK = conf.getOrElse("spark.task.cpus", "1").toInt
+ val CPUS_PER_TASK = conf.getOrElse("spark.task.cpus", "1").toInt
// Maximum times a task is allowed to fail before failing the job
- val MAX_TASK_FAILURES = conf.getOrElse("spark.task.maxFailures", "4").toInt
+ val MAX_TASK_FAILURES = conf.getOrElse("spark.task.maxFailures", "4").toInt
// Quantile of tasks at which to start speculation
- val SPECULATION_QUANTILE = conf.getOrElse("spark.speculation.quantile", "0.75").toDouble
- val SPECULATION_MULTIPLIER = conf.getOrElse("spark.speculation.multiplier", "1.5").toDouble
+ val SPECULATION_QUANTILE = conf.getOrElse("spark.speculation.quantile", "0.75").toDouble
+ val SPECULATION_MULTIPLIER = conf.getOrElse("spark.speculation.multiplier", "1.5").toDouble
// Serializer for closures and tasks.
val env = SparkEnv.get
@@ -118,7 +118,7 @@ private[spark] class ClusterTaskSetManager(
// How frequently to reprint duplicate exceptions in full, in milliseconds
val EXCEPTION_PRINT_INTERVAL =
- conf.getOrElse("spark.logging.exceptionPrintInterval", "10000").toLong
+ conf.getOrElse("spark.logging.exceptionPrintInterval", "10000").toLong
// Map of recent exceptions (identified by string representation and top stack frame) to
// duplicate count (how many times the same exception has appeared) and time the full exception
@@ -678,7 +678,7 @@ private[spark] class ClusterTaskSetManager(
}
private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
- val defaultWait = conf.getOrElse("spark.locality.wait", "3000")
+ val defaultWait = conf.getOrElse("spark.locality.wait", "3000")
level match {
case TaskLocality.PROCESS_LOCAL =>
conf.getOrElse("spark.locality.wait.process", defaultWait).toLong
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 40555903ac..156b01b149 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -62,7 +62,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
// Periodically revive offers to allow delay scheduling to work
- val reviveInterval = conf.getOrElse("spark.scheduler.revive.interval", "1000").toLong
+ val reviveInterval = conf.getOrElse("spark.scheduler.revive.interval", "1000").toLong
import context.dispatcher
context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers)
}
@@ -118,7 +118,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
removeExecutor(executorId, reason)
sender ! true
- case DisassociatedEvent(_, address, _) =>
+ case DisassociatedEvent(_, address, _) =>
addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disassociated"))
}
@@ -163,10 +163,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
override def start() {
val properties = new ArrayBuffer[(String, String)]
- val iterator = scheduler.sc.conf.getAllConfiguration
- while (iterator.hasNext) {
- val entry = iterator.next
- val (key, value) = (entry.getKey.toString, entry.getValue.toString)
+ for ((key, value) <- scheduler.sc.conf.getAll) {
if (key.startsWith("spark.") && !key.equals("spark.hostPort")) {
properties += ((key, value))
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala
index 5367218faa..65d3fc8187 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala
@@ -31,7 +31,4 @@ private[spark] trait SchedulerBackend {
def defaultParallelism(): Int
def killTask(taskId: Long, executorId: String): Unit = throw new UnsupportedOperationException
-
- // Memory used by each executor (in megabytes)
- protected val executorMemory: Int = SparkContext.executorMemoryRequested
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index d01329b2b3..d74f000ebb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -31,7 +31,7 @@ private[spark] class SimrSchedulerBackend(
val tmpPath = new Path(driverFilePath + "_tmp")
val filePath = new Path(driverFilePath)
- val maxCores = conf.getOrElse("spark.simr.executor.cores", "1").toInt
+ val maxCores = conf.getOrElse("spark.simr.executor.cores", "1").toInt
override def start() {
super.start()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index d6b8ac2d57..de69e3260d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -49,7 +49,7 @@ private[spark] class SparkDeploySchedulerBackend(
val command = Command(
"org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
val sparkHome = sc.getSparkHome().getOrElse(null)
- val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome,
+ val appDesc = new ApplicationDescription(appName, maxCores, sc.executorMemory, command, sparkHome,
"http://" + sc.ui.appUIAddress)
client = new Client(sc.env.actorSystem, masters, appDesc, this, conf)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala
index ff6cc37f1d..319c91b933 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala
@@ -32,7 +32,7 @@ import org.apache.spark.util.Utils
private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler)
extends Logging {
- private val THREADS = sparkEnv.conf.getOrElse("spark.resultGetter.threads", "4").toInt
+ private val THREADS = sparkEnv.conf.getOrElse("spark.resultGetter.threads", "4").toInt
private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool(
THREADS, "Result resolver thread")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 2a3b0e15f7..1695374152 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -76,7 +76,7 @@ private[spark] class CoarseMesosSchedulerBackend(
"Spark home is not set; set it through the spark.home system " +
"property, the SPARK_HOME environment variable or the SparkContext constructor"))
- val extraCoresPerSlave = conf.getOrElse("spark.mesos.extra.cores", "0").toInt
+ val extraCoresPerSlave = conf.getOrElse("spark.mesos.extra.cores", "0").toInt
var nextMesosTaskId = 0
@@ -176,7 +176,7 @@ private[spark] class CoarseMesosSchedulerBackend(
val slaveId = offer.getSlaveId.toString
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus").toInt
- if (totalCoresAcquired < maxCores && mem >= executorMemory && cpus >= 1 &&
+ if (totalCoresAcquired < maxCores && mem >= sc.executorMemory && cpus >= 1 &&
failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
!slaveIdsWithExecutors.contains(slaveId)) {
// Launch an executor on the slave
@@ -192,7 +192,7 @@ private[spark] class CoarseMesosSchedulerBackend(
.setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
.setName("Task " + taskId)
.addResources(createResource("cpus", cpusToUse))
- .addResources(createResource("mem", executorMemory))
+ .addResources(createResource("mem", sc.executorMemory))
.build()
d.launchTasks(offer.getId, Collections.singletonList(task), filters)
} else {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 9bb92b4f01..8dfd4d5fb3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -114,7 +114,7 @@ private[spark] class MesosSchedulerBackend(
val memory = Resource.newBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(executorMemory).build())
+ .setScalar(Value.Scalar.newBuilder().setValue(sc.executorMemory).build())
.build()
ExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
@@ -199,7 +199,7 @@ private[spark] class MesosSchedulerBackend(
def enoughMemory(o: Offer) = {
val mem = getResource(o.getResourcesList, "mem")
val slaveId = o.getSlaveId.getValue
- mem >= executorMemory || slaveIdsWithExecutors.contains(slaveId)
+ mem >= sc.executorMemory || slaveIdsWithExecutors.contains(slaveId)
}
for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
@@ -341,5 +341,5 @@ private[spark] class MesosSchedulerBackend(
}
// TODO: query Mesos for number of cores
- override def defaultParallelism() = sc.conf.getOrElse("spark.default.parallelism", "8").toInt
+ override def defaultParallelism() = sc.conf.getOrElse("spark.default.parallelism", "8").toInt
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
index 6069c1db3a..8498cffd31 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
@@ -92,7 +92,7 @@ private[spark] class LocalScheduler(val threads: Int, val maxFailures: Int, val
var schedulableBuilder: SchedulableBuilder = null
var rootPool: Pool = null
val schedulingMode: SchedulingMode = SchedulingMode.withName(
- conf.getOrElse("spark.scheduler.mode", "FIFO"))
+ conf.getOrElse("spark.scheduler.mode", "FIFO"))
val activeTaskSets = new HashMap[String, LocalTaskSetManager]
val taskIdToTaskSetId = new HashMap[Long, String]
val taskSetTaskIds = new HashMap[String, HashSet[Long]]
diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index 4de81617b1..5d3d43623d 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -21,6 +21,7 @@ import java.io._
import java.nio.ByteBuffer
import org.apache.spark.util.ByteBufferInputStream
+import org.apache.spark.SparkConf
private[spark] class JavaSerializationStream(out: OutputStream) extends SerializationStream {
val objOut = new ObjectOutputStream(out)
@@ -77,6 +78,6 @@ private[spark] class JavaSerializerInstance extends SerializerInstance {
/**
* A Spark serializer that uses Java's built-in serialization.
*/
-class JavaSerializer extends Serializer {
+class JavaSerializer(conf: SparkConf) extends Serializer {
def newInstance(): SerializerInstance = new JavaSerializerInstance
}
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 17cec81038..2367f3f521 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -25,20 +25,21 @@ import com.esotericsoftware.kryo.{KryoException, Kryo}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import com.twitter.chill.{EmptyScalaKryoInstantiator, AllScalaRegistrar}
-import org.apache.spark.{SparkContext, SparkConf, SerializableWritable, Logging}
+import org.apache.spark._
import org.apache.spark.broadcast.HttpBroadcast
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage._
import scala.util.Try
+import org.apache.spark.storage.PutBlock
+import org.apache.spark.storage.GetBlock
+import org.apache.spark.storage.GotBlock
/**
* A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]].
*/
-class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging {
-
- private val conf = SparkContext.globalConf
+class KryoSerializer(conf: SparkConf) extends org.apache.spark.serializer.Serializer with Logging {
private val bufferSize = {
- conf.getOrElse("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024
+ conf.getOrElse("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024
}
def newKryoOutput() = new KryoOutput(bufferSize)
@@ -50,7 +51,7 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging
// Allow disabling Kryo reference tracking if user knows their object graphs don't have loops.
// Do this before we invoke the user registrator so the user registrator can override this.
- kryo.setReferences(conf.getOrElse("spark.kryo.referenceTracking", "true").toBoolean)
+ kryo.setReferences(conf.getOrElse("spark.kryo.referenceTracking", "true").toBoolean)
for (cls <- KryoSerializer.toRegister) kryo.register(cls)
diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
index 2955986fec..22465272f3 100644
--- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
@@ -18,6 +18,7 @@
package org.apache.spark.serializer
import java.util.concurrent.ConcurrentHashMap
+import org.apache.spark.SparkConf
/**
@@ -32,12 +33,12 @@ private[spark] class SerializerManager {
def default = _default
- def setDefault(clsName: String): Serializer = {
- _default = get(clsName)
+ def setDefault(clsName: String, conf: SparkConf): Serializer = {
+ _default = get(clsName, conf)
_default
}
- def get(clsName: String): Serializer = {
+ def get(clsName: String, conf: SparkConf): Serializer = {
if (clsName == null) {
default
} else {
@@ -51,8 +52,9 @@ private[spark] class SerializerManager {
serializer = serializers.get(clsName)
if (serializer == null) {
val clsLoader = Thread.currentThread.getContextClassLoader
- serializer =
- Class.forName(clsName, true, clsLoader).newInstance().asInstanceOf[Serializer]
+ val cls = Class.forName(clsName, true, clsLoader)
+ val constructor = cls.getConstructor(classOf[SparkConf])
+ serializer = constructor.newInstance(conf).asInstanceOf[Serializer]
serializers.put(clsName, serializer)
}
serializer
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
index ee2ae471a9..3b25f68ca8 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
@@ -327,7 +327,7 @@ object BlockFetcherIterator {
fetchRequestsSync.put(request)
}
- copiers = startCopiers(conf.getOrElse("spark.shuffle.copier.threads", "6").toInt)
+ copiers = startCopiers(conf.getOrElse("spark.shuffle.copier.threads", "6").toInt)
logInfo("Started " + fetchRequestsSync.size + " remote gets in " +
Utils.getUsedTimeMs(startTime))
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index ffd166e93a..16ee208617 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -58,8 +58,8 @@ private[spark] class BlockManager(
// If we use Netty for shuffle, start a new Netty-based shuffle sender service.
private val nettyPort: Int = {
- val useNetty = conf.getOrElse("spark.shuffle.use.netty", "false").toBoolean
- val nettyPortConfig = conf.getOrElse("spark.shuffle.sender.port", "0").toInt
+ val useNetty = conf.getOrElse("spark.shuffle.use.netty", "false").toBoolean
+ val nettyPortConfig = conf.getOrElse("spark.shuffle.sender.port", "0").toInt
if (useNetty) diskBlockManager.startShuffleBlockSender(nettyPortConfig) else 0
}
@@ -72,18 +72,18 @@ private[spark] class BlockManager(
// Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory
// for receiving shuffle outputs)
val maxBytesInFlight =
- conf.getOrElse("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024
+ conf.getOrElse("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024
// Whether to compress broadcast variables that are stored
- val compressBroadcast = conf.getOrElse("spark.broadcast.compress", "true").toBoolean
+ val compressBroadcast = conf.getOrElse("spark.broadcast.compress", "true").toBoolean
// Whether to compress shuffle output that are stored
- val compressShuffle = conf.getOrElse("spark.shuffle.compress", "true").toBoolean
+ val compressShuffle = conf.getOrElse("spark.shuffle.compress", "true").toBoolean
// Whether to compress RDD partitions that are stored serialized
- val compressRdds = conf.getOrElse("spark.rdd.compress", "false").toBoolean
+ val compressRdds = conf.getOrElse("spark.rdd.compress", "false").toBoolean
- val heartBeatFrequency = BlockManager.getHeartBeatFrequencyFromSystemProperties
+ val heartBeatFrequency = BlockManager.getHeartBeatFrequency(conf)
- val hostPort = Utils.localHostPort()
+ val hostPort = Utils.localHostPort(conf)
val slaveActor = actorSystem.actorOf(Props(new BlockManagerSlaveActor(this)),
name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next)
@@ -101,8 +101,11 @@ private[spark] class BlockManager(
var heartBeatTask: Cancellable = null
- private val metadataCleaner = new MetadataCleaner(MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks)
- private val broadcastCleaner = new MetadataCleaner(MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks)
+ private val metadataCleaner = new MetadataCleaner(
+ MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf)
+ private val broadcastCleaner = new MetadataCleaner(
+ MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf)
+
initialize()
// The compression codec to use. Note that the "lazy" val is necessary because we want to delay
@@ -110,14 +113,14 @@ private[spark] class BlockManager(
// program could be using a user-defined codec in a third party jar, which is loaded in
// Executor.updateDependencies. When the BlockManager is initialized, user level jars hasn't been
// loaded yet.
- private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec()
+ private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec(conf)
/**
* Construct a BlockManager with a memory limit set based on system properties.
*/
def this(execId: String, actorSystem: ActorSystem, master: BlockManagerMaster,
serializer: Serializer, conf: SparkConf) = {
- this(execId, actorSystem, master, serializer, BlockManager.getMaxMemoryFromSystemProperties, conf)
+ this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf), conf)
}
/**
@@ -127,7 +130,7 @@ private[spark] class BlockManager(
private def initialize() {
master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
BlockManagerWorker.startBlockManagerWorker(this)
- if (!BlockManager.getDisableHeartBeatsForTesting) {
+ if (!BlockManager.getDisableHeartBeatsForTesting(conf)) {
heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) {
heartBeat()
}
@@ -440,7 +443,7 @@ private[spark] class BlockManager(
: BlockFetcherIterator = {
val iter =
- if (conf.getOrElse("spark.shuffle.use.netty", "false").toBoolean) {
+ if (conf.getOrElse("spark.shuffle.use.netty", "false").toBoolean) {
new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer)
} else {
new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer)
@@ -466,7 +469,7 @@ private[spark] class BlockManager(
def getDiskWriter(blockId: BlockId, file: File, serializer: Serializer, bufferSize: Int)
: BlockObjectWriter = {
val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
- val syncWrites = conf.getOrElse("spark.shuffle.sync", "false").toBoolean
+ val syncWrites = conf.getOrElse("spark.shuffle.sync", "false").toBoolean
new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream, syncWrites)
}
@@ -858,19 +861,18 @@ private[spark] class BlockManager(
private[spark] object BlockManager extends Logging {
- import org.apache.spark.SparkContext.{globalConf => conf}
val ID_GENERATOR = new IdGenerator
- def getMaxMemoryFromSystemProperties: Long = {
- val memoryFraction = conf.getOrElse("spark.storage.memoryFraction", "0.66").toDouble
+ def getMaxMemory(conf: SparkConf): Long = {
+ val memoryFraction = conf.getOrElse("spark.storage.memoryFraction", "0.66").toDouble
(Runtime.getRuntime.maxMemory * memoryFraction).toLong
}
- def getHeartBeatFrequencyFromSystemProperties: Long =
- conf.getOrElse("spark.storage.blockManagerTimeoutIntervalMs", "60000").toLong / 4
+ def getHeartBeatFrequency(conf: SparkConf): Long =
+ conf.getOrElse("spark.storage.blockManagerTimeoutIntervalMs", "60000").toLong / 4
- def getDisableHeartBeatsForTesting: Boolean =
- conf.getOrElse("spark.test.disableBlockManagerHeartBeat", "false").toBoolean
+ def getDisableHeartBeatsForTesting(conf: SparkConf): Boolean =
+ conf.getOrElse("spark.test.disableBlockManagerHeartBeat", "false").toBoolean
/**
* Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index fde7d63a68..8e4a88b20a 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -31,8 +31,8 @@ private[spark]
class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection],
conf: SparkConf) extends Logging {
- val AKKA_RETRY_ATTEMPTS: Int = conf.getOrElse("spark.akka.num.retries", "3").toInt
- val AKKA_RETRY_INTERVAL_MS: Int = conf.getOrElse("spark.akka.retry.wait", "3000").toInt
+ val AKKA_RETRY_ATTEMPTS: Int = conf.getOrElse("spark.akka.num.retries", "3").toInt
+ val AKKA_RETRY_INTERVAL_MS: Int = conf.getOrElse("spark.akka.retry.wait", "3000").toInt
val DRIVER_AKKA_ACTOR_NAME = "BlockManagerMaster"
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 05502e4451..73a1da2de6 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -53,7 +53,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
initLogging()
val slaveTimeout = conf.getOrElse("spark.storage.blockManagerSlaveTimeoutMs",
- "" + (BlockManager.getHeartBeatFrequencyFromSystemProperties * 3)).toLong
+ "" + (BlockManager.getHeartBeatFrequency(conf) * 3)).toLong
val checkTimeoutInterval = conf.getOrElse("spark.storage.blockManagerTimeoutIntervalMs",
"60000").toLong
@@ -61,7 +61,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
var timeoutCheckingTask: Cancellable = null
override def preStart() {
- if (!BlockManager.getDisableHeartBeatsForTesting) {
+ if (!BlockManager.getDisableHeartBeatsForTesting(conf)) {
import context.dispatcher
timeoutCheckingTask = context.system.scheduler.schedule(
0.seconds, checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 8f528babd4..7697092e1b 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -38,7 +38,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
extends PathResolver with Logging {
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
- private val subDirsPerLocalDir = shuffleManager.conf.getOrElse("spark.diskStore.subDirectories", "64").toInt
+ private val subDirsPerLocalDir = shuffleManager.conf.getOrElse("spark.diskStore.subDirectories", "64").toInt
// Create one local directory for each path mentioned in spark.local.dir; then, inside this
// directory, create multiple subdirectories that we will hash files into, in order to avoid
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index 850d3178dd..f592df283a 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -62,12 +62,13 @@ private[spark] trait ShuffleWriterGroup {
private[spark]
class ShuffleBlockManager(blockManager: BlockManager) {
def conf = blockManager.conf
+
// Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
// TODO: Remove this once the shuffle file consolidation feature is stable.
val consolidateShuffleFiles =
- conf.getOrElse("spark.shuffle.consolidateFiles", "false").toBoolean
+ conf.getOrElse("spark.shuffle.consolidateFiles", "false").toBoolean
- private val bufferSize = conf.getOrElse("spark.shuffle.file.buffer.kb", "100").toInt * 1024
+ private val bufferSize = conf.getOrElse("spark.shuffle.file.buffer.kb", "100").toInt * 1024
/**
* Contains all the state related to a particular shuffle. This includes a pool of unused
@@ -82,8 +83,8 @@ class ShuffleBlockManager(blockManager: BlockManager) {
type ShuffleId = Int
private val shuffleStates = new TimeStampedHashMap[ShuffleId, ShuffleState]
- private
- val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup)
+ private val metadataCleaner =
+ new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup, conf)
def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = {
new ShuffleWriterGroup {
diff --git a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
index d52b3d8284..40734aab49 100644
--- a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
@@ -56,7 +56,7 @@ object StoragePerfTester {
def writeOutputBytes(mapId: Int, total: AtomicLong) = {
val shuffle = blockManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits,
- new KryoSerializer())
+ new KryoSerializer(sc.conf))
val writers = shuffle.writers
for (i <- 1 to recordsPerMap) {
writers(i % numOutputSplits).write(writeData)
diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
index b3b3893393..dca98c6c05 100644
--- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
@@ -22,7 +22,7 @@ import akka.actor._
import java.util.concurrent.ArrayBlockingQueue
import util.Random
import org.apache.spark.serializer.KryoSerializer
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkConf, SparkContext}
/**
* This class tests the BlockManager and MemoryStore for thread safety and
@@ -92,8 +92,8 @@ private[spark] object ThreadingTest {
def main(args: Array[String]) {
System.setProperty("spark.kryoserializer.buffer.mb", "1")
val actorSystem = ActorSystem("test")
- val conf = SparkContext.globalConf
- val serializer = new KryoSerializer
+ val conf = new SparkConf()
+ val serializer = new KryoSerializer(conf)
val blockManagerMaster = new BlockManagerMaster(
Left(actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf)))), conf)
val blockManager = new BlockManager(
diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
index 14751e8e8e..58d47a201d 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
@@ -19,7 +19,7 @@ package org.apache.spark.ui
import scala.util.Random
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.scheduler.SchedulingMode
@@ -31,7 +31,6 @@ import org.apache.spark.scheduler.SchedulingMode
*/
private[spark] object UIWorkloadGenerator {
- import SparkContext.{globalConf => conf}
val NUM_PARTITIONS = 100
val INTER_JOB_WAIT_MS = 5000
@@ -40,14 +39,14 @@ private[spark] object UIWorkloadGenerator {
println("usage: ./spark-class org.apache.spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR]")
System.exit(1)
}
- val master = args(0)
- val schedulingMode = SchedulingMode.withName(args(1))
- val appName = "Spark UI Tester"
+ val conf = new SparkConf().setMaster(args(0)).setAppName("Spark UI tester")
+
+ val schedulingMode = SchedulingMode.withName(args(1))
if (schedulingMode == SchedulingMode.FAIR) {
- conf.set("spark.scheduler.mode", "FAIR")
+ conf.set("spark.scheduler.mode", "FAIR")
}
- val sc = new SparkContext(master, appName)
+ val sc = new SparkContext(conf)
def setProperties(s: String) = {
if(schedulingMode == SchedulingMode.FAIR) {
@@ -57,11 +56,11 @@ private[spark] object UIWorkloadGenerator {
}
val baseData = sc.makeRDD(1 to NUM_PARTITIONS * 10, NUM_PARTITIONS)
- def nextFloat() = (new Random()).nextFloat()
+ def nextFloat() = new Random().nextFloat()
val jobs = Seq[(String, () => Long)](
("Count", baseData.count),
- ("Cache and Count", baseData.map(x => x).cache.count),
+ ("Cache and Count", baseData.map(x => x).cache().count),
("Single Shuffle", baseData.map(x => (x % 10, x)).reduceByKey(_ + _).count),
("Entirely failed phase", baseData.map(x => throw new Exception).count),
("Partially failed phase", {
diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
index b637d37517..91fa00a66c 100644
--- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
@@ -63,7 +63,7 @@ private[spark] class EnvironmentUI(sc: SparkContext) {
UIUtils.listingTable(propertyHeaders, propertyRow, otherProperties, fixedWidth = true)
val classPathEntries = classPathProperty._2
- .split(sc.conf.getOrElse("path.separator", ":"))
+ .split(sc.conf.getOrElse("path.separator", ":"))
.filterNot(e => e.isEmpty)
.map(e => (e, "System Classpath"))
val addedJars = sc.addedJars.iterator.toSeq.map{case (path, time) => (path, "Added By User")}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index f01a1380b9..6ff8e9fb14 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -33,7 +33,7 @@ import org.apache.spark.scheduler._
*/
private[spark] class JobProgressListener(val sc: SparkContext) extends SparkListener {
// How many stages to remember
- val RETAINED_STAGES = sc.conf.getOrElse("spark.ui.retained_stages", "1000").toInt
+ val RETAINED_STAGES = sc.conf.getOrElse("spark.ui.retained_stages", "1000").toInt
val DEFAULT_POOL_NAME = "default"
val stageIdToPool = new HashMap[Int, String]()
@@ -105,7 +105,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[StageInfo]())
stages += stage
}
-
+
override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
val sid = taskStart.task.stageId
val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 76febd5702..58b26f7f12 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -41,19 +41,19 @@ private[spark] object AkkaUtils {
def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false,
conf: SparkConf): (ActorSystem, Int) = {
- val akkaThreads = conf.getOrElse("spark.akka.threads", "4").toInt
- val akkaBatchSize = conf.getOrElse("spark.akka.batchSize", "15").toInt
+ val akkaThreads = conf.getOrElse("spark.akka.threads", "4").toInt
+ val akkaBatchSize = conf.getOrElse("spark.akka.batchSize", "15").toInt
- val akkaTimeout = conf.getOrElse("spark.akka.timeout", "100").toInt
+ val akkaTimeout = conf.getOrElse("spark.akka.timeout", "100").toInt
- val akkaFrameSize = conf.getOrElse("spark.akka.frameSize", "10").toInt
+ val akkaFrameSize = conf.getOrElse("spark.akka.frameSize", "10").toInt
val lifecycleEvents =
- if (conf.getOrElse("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
+ if (conf.getOrElse("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
- val akkaHeartBeatPauses = conf.getOrElse("spark.akka.heartbeat.pauses", "600").toInt
+ val akkaHeartBeatPauses = conf.getOrElse("spark.akka.heartbeat.pauses", "600").toInt
val akkaFailureDetector =
- conf.getOrElse("spark.akka.failure-detector.threshold", "300.0").toDouble
- val akkaHeartBeatInterval = conf.getOrElse("spark.akka.heartbeat.interval", "1000").toInt
+ conf.getOrElse("spark.akka.failure-detector.threshold", "300.0").toDouble
+ val akkaHeartBeatInterval = conf.getOrElse("spark.akka.heartbeat.interval", "1000").toInt
val akkaConf = ConfigFactory.parseString(
s"""
@@ -89,6 +89,6 @@ private[spark] object AkkaUtils {
/** Returns the default Spark timeout to use for Akka ask operations. */
def askTimeout(conf: SparkConf): FiniteDuration = {
- Duration.create(conf.getOrElse("spark.akka.askTimeout", "30").toLong, "seconds")
+ Duration.create(conf.getOrElse("spark.akka.askTimeout", "30").toLong, "seconds")
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
index bf71d17a21..431d88838f 100644
--- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
@@ -18,16 +18,21 @@
package org.apache.spark.util
import java.util.{TimerTask, Timer}
-import org.apache.spark.{SparkContext, Logging}
+import org.apache.spark.{SparkConf, SparkContext, Logging}
/**
* Runs a timer task to periodically clean up metadata (e.g. old files or hashtable entries)
*/
-class MetadataCleaner(cleanerType: MetadataCleanerType.MetadataCleanerType, cleanupFunc: (Long) => Unit) extends Logging {
+class MetadataCleaner(
+ cleanerType: MetadataCleanerType.MetadataCleanerType,
+ cleanupFunc: (Long) => Unit,
+ conf: SparkConf)
+ extends Logging
+{
val name = cleanerType.toString
- private val delaySeconds = MetadataCleaner.getDelaySeconds
+ private val delaySeconds = MetadataCleaner.getDelaySeconds(conf)
private val periodSeconds = math.max(10, delaySeconds / 10)
private val timer = new Timer(name + " cleanup timer", true)
@@ -65,22 +70,28 @@ object MetadataCleanerType extends Enumeration {
def systemProperty(which: MetadataCleanerType.MetadataCleanerType) = "spark.cleaner.ttl." + which.toString
}
+// TODO: This mutates a Conf to set properties right now, which is kind of ugly when used in the
+// initialization of StreamingContext. It's okay for users trying to configure stuff themselves.
object MetadataCleaner {
- private val conf = SparkContext.globalConf
- // using only sys props for now : so that workers can also get to it while preserving earlier behavior.
- def getDelaySeconds = conf.getOrElse("spark.cleaner.ttl", "3500").toInt //TODO: this is to fix tests for time being
+ def getDelaySeconds(conf: SparkConf) = {
+ conf.getOrElse("spark.cleaner.ttl", "3500").toInt
+ }
- def getDelaySeconds(cleanerType: MetadataCleanerType.MetadataCleanerType): Int = {
- conf.getOrElse(MetadataCleanerType.systemProperty(cleanerType), getDelaySeconds.toString).toInt
+ def getDelaySeconds(conf: SparkConf, cleanerType: MetadataCleanerType.MetadataCleanerType): Int =
+ {
+ conf.getOrElse(MetadataCleanerType.systemProperty(cleanerType), getDelaySeconds(conf).toString)
+ .toInt
}
- def setDelaySeconds(cleanerType: MetadataCleanerType.MetadataCleanerType, delay: Int) {
+ def setDelaySeconds(conf: SparkConf, cleanerType: MetadataCleanerType.MetadataCleanerType,
+ delay: Int)
+ {
conf.set(MetadataCleanerType.systemProperty(cleanerType), delay.toString)
}
- def setDelaySeconds(delay: Int, resetAll: Boolean = true) {
+ def setDelaySeconds(conf: SparkConf, delay: Int, resetAll: Boolean = true) {
// override for all ?
- conf.set("spark.cleaner.ttl", delay.toString)
+ conf.set("spark.cleaner.ttl", delay.toString)
if (resetAll) {
for (cleanerType <- MetadataCleanerType.values) {
System.clearProperty(MetadataCleanerType.systemProperty(cleanerType))
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index 1407c39bfb..bddb3bb735 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -30,10 +30,10 @@ import java.lang.management.ManagementFactory
import scala.collection.mutable.ArrayBuffer
import it.unimi.dsi.fastutil.ints.IntOpenHashSet
-import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.{SparkEnv, SparkConf, SparkContext, Logging}
/**
- * Estimates the sizes of Java objects (number of bytes of memory they occupy), for use in
+ * Estimates the sizes of Java objects (number of bytes of memory they occupy), for use in
* memory-aware caches.
*
* Based on the following JavaWorld article:
@@ -41,7 +41,6 @@ import org.apache.spark.{SparkConf, SparkContext, Logging}
*/
private[spark] object SizeEstimator extends Logging {
- private def conf = SparkContext.globalConf
// Sizes of primitive types
private val BYTE_SIZE = 1
private val BOOLEAN_SIZE = 1
@@ -90,9 +89,11 @@ private[spark] object SizeEstimator extends Logging {
classInfos.put(classOf[Object], new ClassInfo(objectSize, Nil))
}
- private def getIsCompressedOops : Boolean = {
- if (conf.getOrElse("spark.test.useCompressedOops", null) != null) {
- return conf.get("spark.test.useCompressedOops").toBoolean
+ private def getIsCompressedOops: Boolean = {
+ // This is only used by tests to override the detection of compressed oops. The test
+ // actually uses a system property instead of a SparkConf, so we'll stick with that.
+ if (System.getProperty("spark.test.useCompressedOops") != null) {
+ return System.getProperty("spark.test.useCompressedOops").toBoolean
}
try {
@@ -104,7 +105,7 @@ private[spark] object SizeEstimator extends Logging {
val getVMMethod = hotSpotMBeanClass.getDeclaredMethod("getVMOption",
Class.forName("java.lang.String"))
- val bean = ManagementFactory.newPlatformMXBeanProxy(server,
+ val bean = ManagementFactory.newPlatformMXBeanProxy(server,
hotSpotMBeanName, hotSpotMBeanClass)
// TODO: We could use reflection on the VMOption returned ?
return getVMMethod.invoke(bean, "UseCompressedOops").toString.contains("true")
@@ -252,7 +253,7 @@ private[spark] object SizeEstimator extends Logging {
if (info != null) {
return info
}
-
+
val parent = getClassInfo(cls.getSuperclass)
var shellSize = parent.shellSize
var pointerFields = parent.pointerFields
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index fd5888e525..b6b89cc7bb 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -36,15 +36,13 @@ import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
import org.apache.spark.deploy.SparkHadoopUtil
import java.nio.ByteBuffer
-import org.apache.spark.{SparkContext, SparkException, Logging}
+import org.apache.spark.{SparkConf, SparkContext, SparkException, Logging}
/**
* Various utility methods used by Spark.
*/
private[spark] object Utils extends Logging {
-
- private lazy val conf = SparkContext.globalConf
/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()
@@ -240,9 +238,9 @@ private[spark] object Utils extends Logging {
* Throws SparkException if the target file already exists and has different contents than
* the requested file.
*/
- def fetchFile(url: String, targetDir: File) {
+ def fetchFile(url: String, targetDir: File, conf: SparkConf) {
val filename = url.split("/").last
- val tempDir = getLocalDir
+ val tempDir = getLocalDir(conf)
val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir))
val targetFile = new File(targetDir, filename)
val uri = new URI(url)
@@ -312,7 +310,7 @@ private[spark] object Utils extends Logging {
* return a single directory, even though the spark.local.dir property might be a list of
* multiple paths.
*/
- def getLocalDir: String = {
+ def getLocalDir(conf: SparkConf): String = {
conf.getOrElse("spark.local.dir", System.getProperty("java.io.tmpdir")).split(',')(0)
}
@@ -398,7 +396,7 @@ private[spark] object Utils extends Logging {
InetAddress.getByName(address).getHostName
}
- def localHostPort(): String = {
+ def localHostPort(conf: SparkConf): String = {
val retval = conf.getOrElse("spark.hostPort", null)
if (retval == null) {
logErrorWithStack("spark.hostPort not set but invoking localHostPort")
@@ -838,7 +836,7 @@ private[spark] object Utils extends Logging {
}
}
- /**
+ /**
* Timing method based on iterations that permit JVM JIT optimization.
* @param numIters number of iterations
* @param f function to be executed
diff --git a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
index ab81bfbe55..8d7546085f 100644
--- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
+++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
@@ -20,9 +20,11 @@ package org.apache.spark.io
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import org.scalatest.FunSuite
+import org.apache.spark.SparkConf
class CompressionCodecSuite extends FunSuite {
+ val conf = new SparkConf(false)
def testCodec(codec: CompressionCodec) {
// Write 1000 integers to the output stream, compressed.
@@ -43,19 +45,19 @@ class CompressionCodecSuite extends FunSuite {
}
test("default compression codec") {
- val codec = CompressionCodec.createCodec()
+ val codec = CompressionCodec.createCodec(conf)
assert(codec.getClass === classOf[LZFCompressionCodec])
testCodec(codec)
}
test("lzf compression codec") {
- val codec = CompressionCodec.createCodec(classOf[LZFCompressionCodec].getName)
+ val codec = CompressionCodec.createCodec(conf, classOf[LZFCompressionCodec].getName)
assert(codec.getClass === classOf[LZFCompressionCodec])
testCodec(codec)
}
test("snappy compression codec") {
- val codec = CompressionCodec.createCodec(classOf[SnappyCompressionCodec].getName)
+ val codec = CompressionCodec.createCodec(conf, classOf[SnappyCompressionCodec].getName)
assert(codec.getClass === classOf[SnappyCompressionCodec])
testCodec(codec)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
index 2bb827c022..3711382f2e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
@@ -82,7 +82,7 @@ class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /*
class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
import TaskLocality.{ANY, PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL}
private val conf = new SparkConf
- val LOCALITY_WAIT = conf.getOrElse("spark.locality.wait", "3000").toLong
+ val LOCALITY_WAIT = conf.getOrElse("spark.locality.wait", "3000").toLong
test("TaskSet with no preferences") {
sc = new SparkContext("local", "test")
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index c016c51171..33b0148896 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -22,12 +22,14 @@ import scala.collection.mutable
import com.esotericsoftware.kryo.Kryo
import org.scalatest.FunSuite
-import org.apache.spark.SharedSparkContext
+import org.apache.spark.{SparkConf, SharedSparkContext}
import org.apache.spark.serializer.KryoTest._
class KryoSerializerSuite extends FunSuite with SharedSparkContext {
+ val conf = new SparkConf(false)
+
test("basic types") {
- val ser = (new KryoSerializer).newInstance()
+ val ser = new KryoSerializer(conf).newInstance()
def check[T](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
}
@@ -57,7 +59,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
}
test("pairs") {
- val ser = (new KryoSerializer).newInstance()
+ val ser = new KryoSerializer(conf).newInstance()
def check[T](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
}
@@ -81,7 +83,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
}
test("Scala data structures") {
- val ser = (new KryoSerializer).newInstance()
+ val ser = new KryoSerializer(conf).newInstance()
def check[T](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
}
@@ -104,7 +106,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
}
test("ranges") {
- val ser = (new KryoSerializer).newInstance()
+ val ser = new KryoSerializer(conf).newInstance()
def check[T](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
// Check that very long ranges don't get written one element at a time
@@ -127,7 +129,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
test("custom registrator") {
System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName)
- val ser = (new KryoSerializer).newInstance()
+ val ser = new KryoSerializer(conf).newInstance()
def check[T](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 4ef5538951..a0fc3445be 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -34,7 +34,7 @@ import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.{SparkConf, SparkContext}
class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester {
- private val conf = new SparkConf
+ private val conf = new SparkConf(false)
var store: BlockManager = null
var store2: BlockManager = null
var actorSystem: ActorSystem = null
@@ -45,7 +45,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
conf.set("spark.kryoserializer.buffer.mb", "1")
- val serializer = new KryoSerializer
+ val serializer = new KryoSerializer(conf)
// Implicitly convert strings to BlockIds for test clarity.
implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
@@ -167,7 +167,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
test("master + 2 managers interaction") {
store = new BlockManager("exec1", actorSystem, master, serializer, 2000, conf)
- store2 = new BlockManager("exec2", actorSystem, master, new KryoSerializer, 2000, conf)
+ store2 = new BlockManager("exec2", actorSystem, master, new KryoSerializer(conf), 2000, conf)
val peers = master.getPeers(store.blockManagerId, 1)
assert(peers.size === 1, "master did not return the other manager as a peer")
@@ -654,7 +654,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
test("block store put failure") {
// Use Java serializer so we can create an unserializable error.
- store = new BlockManager("<driver>", actorSystem, master, new JavaSerializer, 1200, conf)
+ store = new BlockManager("<driver>", actorSystem, master, new JavaSerializer(conf), 1200, conf)
// The put should fail since a1 is not serializable.
class UnserializableClass
diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
index a5facd5bbd..11ebdc352b 100644
--- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
@@ -140,8 +140,6 @@ class SizeEstimatorSuite
test("64-bit arch with no compressed oops") {
val arch = System.setProperty("os.arch", "amd64")
val oops = System.setProperty("spark.test.useCompressedOops", "false")
- SparkContext.globalConf.set("os.arch", "amd64")
- SparkContext.globalConf.set("spark.test.useCompressedOops", "false")
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()