aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2013-12-24 18:30:31 +0530
committerPrashant Sharma <scrapcodes@gmail.com>2013-12-25 00:09:36 +0530
commit2573add94cf920a88f74d80d8ea94218d812704d (patch)
tree9eb07c85cadbaea90a8e9742687adda924342b42
parent0bc57c576792ba800eca0ec196c92a4d29cb3953 (diff)
downloadspark-2573add94cf920a88f74d80d8ea94218d812704d.tar.gz
spark-2573add94cf920a88f74d80d8ea94218d812704d.tar.bz2
spark-2573add94cf920a88f74d80d8ea94218d812704d.zip
spark-544, introducing SparkConf and related configuration overhaul.
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/Partitioner.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala71
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala140
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala44
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/Client.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala28
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/io/CompressionCodec.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/network/ConnectionManager.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/network/ReceiverTest.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/network/SenderTest.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala38
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/ui/SparkUI.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala25
-rw-r--r--core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/util/SizeEstimator.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala16
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala23
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala95
-rw-r--r--core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala10
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala8
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala3
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala13
-rw-r--r--new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala16
-rw-r--r--new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala4
-rw-r--r--new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala2
-rw-r--r--new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala4
-rw-r--r--new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala4
-rw-r--r--new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala4
-rw-r--r--project/SparkBuild.scala3
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala7
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala7
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala18
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala11
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala16
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala6
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala2
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala4
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala2
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala4
96 files changed, 612 insertions, 478 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index ccffcc356c..4520edb10d 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -50,9 +50,9 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
}
}
-private[spark] class MapOutputTracker extends Logging {
+private[spark] class MapOutputTracker(conf: SparkConf) extends Logging {
- private val timeout = AkkaUtils.askTimeout
+ private val timeout = AkkaUtils.askTimeout(conf)
// Set to the MapOutputTrackerActor living on the driver
var trackerActor: Either[ActorRef, ActorSelection] = _
@@ -192,7 +192,8 @@ private[spark] class MapOutputTracker extends Logging {
}
}
-private[spark] class MapOutputTrackerMaster extends MapOutputTracker {
+private[spark] class MapOutputTrackerMaster(conf: SparkConf)
+ extends MapOutputTracker(conf) {
// Cache a serialized version of the output statuses for each shuffle to send them out faster
private var cacheEpoch = epoch
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index bcec41c439..04c1eedfeb 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -32,6 +32,8 @@ abstract class Partitioner extends Serializable {
}
object Partitioner {
+
+ import SparkContext.{globalConf => conf}
/**
* Choose a partitioner to use for a cogroup-like operation between a number of RDDs.
*
@@ -52,7 +54,7 @@ object Partitioner {
for (r <- bySize if r.partitioner != None) {
return r.partitioner.get
}
- if (System.getProperty("spark.default.parallelism") != null) {
+ if (conf.getOrElse("spark.default.parallelism", null) != null) {
return new HashPartitioner(rdd.context.defaultParallelism)
} else {
return new HashPartitioner(bySize.head.partitions.size)
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
new file mode 100644
index 0000000000..9a4eefad2e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -0,0 +1,71 @@
+package org.apache.spark
+
+import scala.collection.JavaConversions._
+import scala.collection.concurrent.TrieMap
+
+import com.typesafe.config.ConfigFactory
+
+private[spark] class SparkConf(loadClasspathRes: Boolean = true) extends Serializable {
+ @transient lazy val config = ConfigFactory.systemProperties()
+ .withFallback(ConfigFactory.parseResources("spark.conf"))
+ // TODO this should actually be synchronized
+ private val configMap = TrieMap[String, String]()
+
+ if (loadClasspathRes && !config.entrySet().isEmpty) {
+ for (e <- config.entrySet()) {
+ configMap += ((e.getKey, e.getValue.unwrapped().toString))
+ }
+ }
+
+ def setMasterUrl(master: String) = {
+ if (master != null)
+ configMap += (("spark.master", master))
+ this
+ }
+
+ def setAppName(name: String) = {
+ if (name != null)
+ configMap += (("spark.appName", name))
+ this
+ }
+
+ def setJars(jars: Seq[String]) = {
+ if (!jars.isEmpty)
+ configMap += (("spark.jars", jars.mkString(",")))
+ this
+ }
+
+ def set(k: String, value: String) = {
+ configMap += ((k, value))
+ this
+ }
+
+ def setSparkHome(home: String) = {
+ if (home != null)
+ configMap += (("spark.home", home))
+ this
+ }
+
+ def set(map: Seq[(String, String)]) = {
+ if (map != null && !map.isEmpty)
+ configMap ++= map
+ this
+ }
+
+ def get(k: String): String = {
+ configMap(k)
+ }
+
+ def getAllConfiguration = configMap.clone.entrySet().iterator
+
+ def getOrElse(k: String, defaultValue: String): String = {
+ configMap.getOrElse(k, defaultValue)
+ }
+
+ override def clone: SparkConf = {
+ val conf = new SparkConf(false)
+ conf.set(configMap.toSeq)
+ conf
+ }
+
+}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index a0f794edfd..4300b07bdb 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -22,91 +22,99 @@ import java.net.URI
import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger
-import scala.collection.Map
+import scala.collection.{Map, immutable}
+import scala.collection.JavaConversions._
import scala.collection.generic.Growable
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
+
+import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.reflect.{ClassTag, classTag}
+import scala.util.Try
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.ArrayWritable
-import org.apache.hadoop.io.BooleanWritable
-import org.apache.hadoop.io.BytesWritable
-import org.apache.hadoop.io.DoubleWritable
-import org.apache.hadoop.io.FloatWritable
-import org.apache.hadoop.io.IntWritable
-import org.apache.hadoop.io.LongWritable
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapred.FileInputFormat
-import org.apache.hadoop.mapred.InputFormat
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapred.SequenceFileInputFormat
-import org.apache.hadoop.mapred.TextInputFormat
-import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
-import org.apache.hadoop.mapreduce.{Job => NewHadoopJob}
+import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable,
+FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
+import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat,
+TextInputFormat}
+import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
-
import org.apache.mesos.MesosNativeLibrary
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.scheduler._
-import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,
- SparkDeploySchedulerBackend, ClusterScheduler, SimrSchedulerBackend}
-import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
+import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedulerBackend,
+SimrSchedulerBackend, SparkDeploySchedulerBackend}
+import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend,
+MesosSchedulerBackend}
import org.apache.spark.scheduler.local.LocalScheduler
-import org.apache.spark.scheduler.StageInfo
import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType,
- TimeStampedHashMap, Utils}
+import org.apache.spark.util._
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
*
- * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
- * @param appName A name for your application, to display on the cluster web UI.
- * @param sparkHome Location where Spark is installed on cluster nodes.
- * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
- * system or HDFS, HTTP, HTTPS, or FTP URLs.
+ * @param conf a Spark Config object describing the context configuration. Any settings in this
+ * config overrides the default configs as well as system properties.
+ *
* @param environment Environment variables to set on worker nodes.
*/
class SparkContext(
- val master: String,
- val appName: String,
- val sparkHome: String = null,
- val jars: Seq[String] = Nil,
+ val conf: SparkConf,
val environment: Map[String, String] = Map(),
// This is used only by YARN for now, but should be relevant to other cluster types (Mesos, etc)
// too. This is typically generated from InputFormatInfo.computePreferredLocations .. host, set
// of data-local splits on host
- val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
- scala.collection.immutable.Map())
+ val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = immutable.Map())
extends Logging {
- // Ensure logging is initialized before we spawn any threads
- initLogging()
+ /**
+ * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
+ * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
+ *
+ * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
+ * @param appName A name for your application, to display on the cluster web UI.
+ * @param sparkHome Location where Spark is installed on cluster nodes.
+ * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
+ * system or HDFS, HTTP, HTTPS, or FTP URLs.
+ * @param environment Environment variables to set on worker nodes.
+ */
+ def this(master: String, appName: String, sparkHome: String = null,
+ jars: Seq[String] = Nil, environment: Map[String, String] = Map(),
+ preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
+ immutable.Map()) =
+ this(new SparkConf(false).setAppName(appName).setMasterUrl(master)
+ .setJars(jars).set(environment.toSeq).setSparkHome(sparkHome),
+ environment, preferredNodeLocationData)
// Set Spark driver host and port system properties
- if (System.getProperty("spark.driver.host") == null) {
- System.setProperty("spark.driver.host", Utils.localHostName())
- }
- if (System.getProperty("spark.driver.port") == null) {
- System.setProperty("spark.driver.port", "0")
- }
+ Try(conf.get("spark.driver.host"))
+ .getOrElse(conf.set("spark.driver.host", Utils.localHostName()))
+
+ Try(conf.get("spark.driver.port"))
+ .getOrElse(conf.set("spark.driver.port", "0"))
+
+ val jars: Seq[String] = if (conf.getOrElse("spark.jars", null) != null) {
+ conf.get("spark.jars").split(",")
+ } else null
+
+ val master = conf.get("spark.master")
+ val appName = conf.get("spark.appName")
val isLocal = (master == "local" || master.startsWith("local["))
+ // Ensure logging is initialized before we spawn any threads
+ initLogging()
+
// Create the Spark execution environment (cache, map output tracker, etc)
private[spark] val env = SparkEnv.createFromSystemProperties(
"<driver>",
- System.getProperty("spark.driver.host"),
- System.getProperty("spark.driver.port").toInt,
+ conf.get("spark.driver.host"),
+ conf.get("spark.driver.port").toInt,
+ conf,
true,
isLocal)
SparkEnv.set(env)
@@ -165,24 +173,24 @@ class SparkContext(
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = {
val env = SparkEnv.get
- val conf = SparkHadoopUtil.get.newConfiguration()
+ val hadoopConf = SparkHadoopUtil.get.newConfiguration()
// Explicitly check for S3 environment variables
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
- conf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
- conf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
- conf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
- conf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
+ hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
+ hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
+ hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
+ hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
}
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
Utils.getSystemProperties.foreach { case (key, value) =>
if (key.startsWith("spark.hadoop.")) {
- conf.set(key.substring("spark.hadoop.".length), value)
+ hadoopConf.set(key.substring("spark.hadoop.".length), value)
}
}
- val bufferSize = System.getProperty("spark.buffer.size", "65536")
- conf.set("io.file.buffer.size", bufferSize)
- conf
+ val bufferSize = conf.getOrElse("spark.buffer.size", "65536")
+ hadoopConf.set("io.file.buffer.size", bufferSize)
+ hadoopConf
}
private[spark] var checkpointDir: Option[String] = None
@@ -695,10 +703,8 @@ class SparkContext(
* (in that order of preference). If neither of these is set, return None.
*/
private[spark] def getSparkHome(): Option[String] = {
- if (sparkHome != null) {
- Some(sparkHome)
- } else if (System.getProperty("spark.home") != null) {
- Some(System.getProperty("spark.home"))
+ if (conf.getOrElse("spark.home", null) != null) {
+ Some(conf.get("spark.home"))
} else if (System.getenv("SPARK_HOME") != null) {
Some(System.getenv("SPARK_HOME"))
} else {
@@ -909,6 +915,14 @@ object SparkContext {
private[spark] val SPARK_UNKNOWN_USER = "<unknown>"
+ private lazy val conf = new SparkConf()
+
+ private[spark] def globalConf = {
+ if (SparkEnv.get != null) {
+ SparkEnv.get.conf
+ } else conf
+ }
+
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
def zero(initialValue: Double) = 0.0
@@ -1020,7 +1034,7 @@ object SparkContext {
/** Get the amount of memory per executor requested through system properties or SPARK_MEM */
private[spark] val executorMemoryRequested = {
// TODO: Might need to add some extra memory for the non-heap parts of the JVM
- Option(System.getProperty("spark.executor.memory"))
+ Try(globalConf.get("spark.executor.memory")).toOption
.orElse(Option(System.getenv("SPARK_MEM")))
.map(Utils.memoryStringToMb)
.getOrElse(512)
@@ -1123,7 +1137,7 @@ object SparkContext {
case mesosUrl @ MESOS_REGEX(_) =>
MesosNativeLibrary.load()
val scheduler = new ClusterScheduler(sc)
- val coarseGrained = System.getProperty("spark.mesos.coarse", "false").toBoolean
+ val coarseGrained = globalConf.getOrElse("spark.mesos.coarse", "false").toBoolean
val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
val backend = if (coarseGrained) {
new CoarseMesosSchedulerBackend(scheduler, sc, url, appName)
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 826f5c2d8c..78e4ae27b2 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -54,7 +54,8 @@ class SparkEnv (
val connectionManager: ConnectionManager,
val httpFileServer: HttpFileServer,
val sparkFilesDir: String,
- val metricsSystem: MetricsSystem) {
+ val metricsSystem: MetricsSystem,
+ val conf: SparkConf) {
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
@@ -114,25 +115,27 @@ object SparkEnv extends Logging {
executorId: String,
hostname: String,
port: Int,
+ conf: SparkConf,
isDriver: Boolean,
isLocal: Boolean): SparkEnv = {
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port)
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port,
+ conf = conf)
// Bit of a hack: If this is the driver and our port was 0 (meaning bind to any free port),
// figure out which port number Akka actually bound to and set spark.driver.port to it.
if (isDriver && port == 0) {
- System.setProperty("spark.driver.port", boundPort.toString)
+ conf.set("spark.driver.port", boundPort.toString)
}
// set only if unset until now.
- if (System.getProperty("spark.hostPort", null) == null) {
+ if (conf.getOrElse("spark.hostPort", null) == null) {
if (!isDriver){
// unexpected
Utils.logErrorWithStack("Unexpected NOT to have spark.hostPort set")
}
Utils.checkHost(hostname)
- System.setProperty("spark.hostPort", hostname + ":" + boundPort)
+ conf.set("spark.hostPort", hostname + ":" + boundPort)
}
val classLoader = Thread.currentThread.getContextClassLoader
@@ -140,25 +143,25 @@ object SparkEnv extends Logging {
// Create an instance of the class named by the given Java system property, or by
// defaultClassName if the property is not set, and return it as a T
def instantiateClass[T](propertyName: String, defaultClassName: String): T = {
- val name = System.getProperty(propertyName, defaultClassName)
+ val name = conf.getOrElse(propertyName, defaultClassName)
Class.forName(name, true, classLoader).newInstance().asInstanceOf[T]
}
val serializerManager = new SerializerManager
val serializer = serializerManager.setDefault(
- System.getProperty("spark.serializer", "org.apache.spark.serializer.JavaSerializer"))
+ conf.getOrElse("spark.serializer", "org.apache.spark.serializer.JavaSerializer"))
val closureSerializer = serializerManager.get(
- System.getProperty("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"))
+ conf.getOrElse("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"))
def registerOrLookup(name: String, newActor: => Actor): Either[ActorRef, ActorSelection] = {
if (isDriver) {
logInfo("Registering " + name)
Left(actorSystem.actorOf(Props(newActor), name = name))
} else {
- val driverHost: String = System.getProperty("spark.driver.host", "localhost")
- val driverPort: Int = System.getProperty("spark.driver.port", "7077").toInt
+ val driverHost: String = conf.getOrElse("spark.driver.host", "localhost")
+ val driverPort: Int = conf.getOrElse("spark.driver.port", "7077").toInt
Utils.checkHost(driverHost, "Expected hostname")
val url = "akka.tcp://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
logInfo("Connecting to " + name + ": " + url)
@@ -168,21 +171,21 @@ object SparkEnv extends Logging {
val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
"BlockManagerMaster",
- new BlockManagerMasterActor(isLocal)))
- val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer)
+ new BlockManagerMasterActor(isLocal, conf)), conf)
+ val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer, conf)
val connectionManager = blockManager.connectionManager
- val broadcastManager = new BroadcastManager(isDriver)
+ val broadcastManager = new BroadcastManager(isDriver, conf)
val cacheManager = new CacheManager(blockManager)
// Have to assign trackerActor after initialization as MapOutputTrackerActor
// requires the MapOutputTracker itself
val mapOutputTracker = if (isDriver) {
- new MapOutputTrackerMaster()
+ new MapOutputTrackerMaster(conf)
} else {
- new MapOutputTracker()
+ new MapOutputTracker(conf)
}
mapOutputTracker.trackerActor = registerOrLookup(
"MapOutputTracker",
@@ -193,12 +196,12 @@ object SparkEnv extends Logging {
val httpFileServer = new HttpFileServer()
httpFileServer.initialize()
- System.setProperty("spark.fileserver.uri", httpFileServer.serverUri)
+ conf.set("spark.fileserver.uri", httpFileServer.serverUri)
val metricsSystem = if (isDriver) {
- MetricsSystem.createMetricsSystem("driver")
+ MetricsSystem.createMetricsSystem("driver", conf)
} else {
- MetricsSystem.createMetricsSystem("executor")
+ MetricsSystem.createMetricsSystem("executor", conf)
}
metricsSystem.start()
@@ -212,7 +215,7 @@ object SparkEnv extends Logging {
}
// Warn about deprecated spark.cache.class property
- if (System.getProperty("spark.cache.class") != null) {
+ if (conf.getOrElse("spark.cache.class", null) != null) {
logWarning("The spark.cache.class property is no longer being used! Specify storage " +
"levels using the RDD.persist() method instead.")
}
@@ -231,6 +234,7 @@ object SparkEnv extends Logging {
connectionManager,
httpFileServer,
sparkFilesDir,
- metricsSystem)
+ metricsSystem,
+ conf)
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index ca42c76928..d6eacfe23e 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -41,7 +41,7 @@ private[spark] class PythonRDD[T: ClassTag](
accumulator: Accumulator[JList[Array[Byte]]])
extends RDD[Array[Byte]](parent) {
- val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
+ val bufferSize = conf.getOrElse("spark.buffer.size", "65536").toInt
override def getPartitions = parent.partitions
@@ -247,10 +247,10 @@ private class BytesToString extends org.apache.spark.api.java.function.Function[
*/
private class PythonAccumulatorParam(@transient serverHost: String, serverPort: Int)
extends AccumulatorParam[JList[Array[Byte]]] {
-
+ import SparkContext.{globalConf => conf}
Utils.checkHost(serverHost, "Expected hostname")
- val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
+ val bufferSize = conf.getOrElse("spark.buffer.size", "65536").toInt
override def zero(value: JList[Array[Byte]]): JList[Array[Byte]] = new JArrayList
diff --git a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
index 43c18294c5..be99d229ef 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
@@ -32,7 +32,7 @@ abstract class Broadcast[T](private[spark] val id: Long) extends Serializable {
}
private[spark]
-class BroadcastManager(val _isDriver: Boolean) extends Logging with Serializable {
+class BroadcastManager(val _isDriver: Boolean, conf: SparkConf) extends Logging with Serializable {
private var initialized = false
private var broadcastFactory: BroadcastFactory = null
@@ -43,14 +43,14 @@ class BroadcastManager(val _isDriver: Boolean) extends Logging with Serializable
private def initialize() {
synchronized {
if (!initialized) {
- val broadcastFactoryClass = System.getProperty(
+ val broadcastFactoryClass = conf.getOrElse(
"spark.broadcast.factory", "org.apache.spark.broadcast.HttpBroadcastFactory")
broadcastFactory =
Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]
// Initialize appropriate BroadcastFactory and BroadcastObject
- broadcastFactory.initialize(isDriver)
+ broadcastFactory.initialize(isDriver, conf)
initialized = true
}
diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala
index 68bff75b90..fb161ce69d 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala
@@ -17,6 +17,8 @@
package org.apache.spark.broadcast
+import org.apache.spark.SparkConf
+
/**
* An interface for all the broadcast implementations in Spark (to allow
* multiple broadcast implementations). SparkContext uses a user-specified
@@ -24,7 +26,7 @@ package org.apache.spark.broadcast
* entire Spark job.
*/
private[spark] trait BroadcastFactory {
- def initialize(isDriver: Boolean): Unit
+ def initialize(isDriver: Boolean, conf: SparkConf): Unit
def newBroadcast[T](value: T, isLocal: Boolean, id: Long): Broadcast[T]
def stop(): Unit
}
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 47db720416..cecb8c228b 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit
import it.unimi.dsi.fastutil.io.FastBufferedInputStream
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
-import org.apache.spark.{HttpServer, Logging, SparkEnv}
+import org.apache.spark.{SparkConf, HttpServer, Logging, SparkEnv}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashSet, Utils}
@@ -64,7 +64,7 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea
}
private[spark] class HttpBroadcastFactory extends BroadcastFactory {
- def initialize(isDriver: Boolean) { HttpBroadcast.initialize(isDriver) }
+ def initialize(isDriver: Boolean, conf: SparkConf) { HttpBroadcast.initialize(isDriver, conf) }
def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long) =
new HttpBroadcast[T](value_, isLocal, id)
@@ -88,15 +88,16 @@ private object HttpBroadcast extends Logging {
private lazy val compressionCodec = CompressionCodec.createCodec()
- def initialize(isDriver: Boolean) {
+ def initialize(isDriver: Boolean, conf: SparkConf) {
synchronized {
if (!initialized) {
- bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
- compress = System.getProperty("spark.broadcast.compress", "true").toBoolean
+ bufferSize = conf.getOrElse("spark.buffer.size", "65536").toInt
+ compress = conf.getOrElse("spark.broadcast.compress", "true").toBoolean
if (isDriver) {
createServer()
+ conf.set("spark.httpBroadcast.uri", serverUri)
}
- serverUri = System.getProperty("spark.httpBroadcast.uri")
+ serverUri = conf.get("spark.httpBroadcast.uri")
initialized = true
}
}
@@ -118,7 +119,6 @@ private object HttpBroadcast extends Logging {
server = new HttpServer(broadcastDir)
server.start()
serverUri = server.uri
- System.setProperty("spark.httpBroadcast.uri", serverUri)
logInfo("Broadcast server started at " + serverUri)
}
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 073a0a5029..4a3801dc48 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -166,8 +166,9 @@ private object TorrentBroadcast
extends Logging {
private var initialized = false
-
- def initialize(_isDriver: Boolean) {
+ private var conf: SparkConf = null
+ def initialize(_isDriver: Boolean, conf: SparkConf) {
+ TorrentBroadcast.conf = conf //TODO: we might have to fix it in tests
synchronized {
if (!initialized) {
initialized = true
@@ -179,7 +180,7 @@ extends Logging {
initialized = false
}
- val BLOCK_SIZE = System.getProperty("spark.broadcast.blockSize", "4096").toInt * 1024
+ lazy val BLOCK_SIZE = conf.getOrElse("spark.broadcast.blockSize", "4096").toInt * 1024
def blockifyObject[T](obj: T): TorrentInfo = {
val byteArray = Utils.serialize[T](obj)
@@ -238,7 +239,7 @@ private[spark] case class TorrentInfo(
private[spark] class TorrentBroadcastFactory
extends BroadcastFactory {
- def initialize(isDriver: Boolean) { TorrentBroadcast.initialize(isDriver) }
+ def initialize(isDriver: Boolean, conf: SparkConf) { TorrentBroadcast.initialize(isDriver, conf) }
def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long) =
new TorrentBroadcast[T](value_, isLocal, id)
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index 19d393a0db..dda43dc018 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -26,7 +26,7 @@ private[spark] class ApplicationDescription(
val appUiUrl: String)
extends Serializable {
- val user = System.getProperty("user.name", "<unknown>")
+ val user = System.getProperty("user.name", "<unknown>")
override def toString: String = "ApplicationDescription(" + name + ")"
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index fc1537f796..1c979ac3e0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -67,8 +67,9 @@ class SparkHadoopUtil {
}
object SparkHadoopUtil {
+ import SparkContext.{globalConf => conf}
private val hadoop = {
- val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
+ val yarnMode = java.lang.Boolean.valueOf(conf.getOrElse("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
if (yarnMode) {
try {
Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index 953755e40d..9bbd635ab9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -19,20 +19,18 @@ package org.apache.spark.deploy.client
import java.util.concurrent.TimeoutException
-import scala.concurrent.duration._
import scala.concurrent.Await
+import scala.concurrent.duration._
import akka.actor._
import akka.pattern.ask
-import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent}
-
-import org.apache.spark.{SparkException, Logging}
+import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
+import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.util.AkkaUtils
-
/**
* The main class used to talk to a Spark deploy cluster. Takes a master URL, an app description,
* and a listener for cluster events, and calls back the listener when various events occur.
@@ -43,7 +41,8 @@ private[spark] class Client(
actorSystem: ActorSystem,
masterUrls: Array[String],
appDescription: ApplicationDescription,
- listener: ClientListener)
+ listener: ClientListener,
+ conf: SparkConf)
extends Logging {
val REGISTRATION_TIMEOUT = 20.seconds
@@ -178,7 +177,7 @@ private[spark] class Client(
def stop() {
if (actor != null) {
try {
- val timeout = AkkaUtils.askTimeout
+ val timeout = AkkaUtils.askTimeout(conf)
val future = actor.ask(StopClient)(timeout)
Await.result(future, timeout)
} catch {
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
index 5b62d3ba6c..426cf524ae 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
@@ -18,7 +18,7 @@
package org.apache.spark.deploy.client
import org.apache.spark.util.{Utils, AkkaUtils}
-import org.apache.spark.{Logging}
+import org.apache.spark.{SparkContext, Logging}
import org.apache.spark.deploy.{Command, ApplicationDescription}
private[spark] object TestClient {
@@ -45,11 +45,12 @@ private[spark] object TestClient {
def main(args: Array[String]) {
val url = args(0)
- val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0)
+ val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
+ conf = SparkContext.globalConf)
val desc = new ApplicationDescription(
"TestClient", 1, 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()), "dummy-spark-home", "ignored")
val listener = new TestListener
- val client = new Client(actorSystem, Array(url), desc, listener)
+ val client = new Client(actorSystem, Array(url), desc, listener, SparkContext.globalConf)
client.start()
actorSystem.awaitTermination()
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index eebd0794b8..2c162c4fa2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -29,7 +29,7 @@ import akka.pattern.ask
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import akka.serialization.SerializationExtension
-import org.apache.spark.{Logging, SparkException}
+import org.apache.spark.{SparkContext, Logging, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.MasterMessages._
@@ -39,13 +39,13 @@ import org.apache.spark.util.{AkkaUtils, Utils}
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
import context.dispatcher
-
+ val conf = SparkContext.globalConf
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
- val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
- val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt
- val REAPER_ITERATIONS = System.getProperty("spark.dead.worker.persistence", "15").toInt
- val RECOVERY_DIR = System.getProperty("spark.deploy.recoveryDirectory", "")
- val RECOVERY_MODE = System.getProperty("spark.deploy.recoveryMode", "NONE")
+ val WORKER_TIMEOUT = conf.getOrElse("spark.worker.timeout", "60").toLong * 1000
+ val RETAINED_APPLICATIONS = conf.getOrElse("spark.deploy.retainedApplications", "200").toInt
+ val REAPER_ITERATIONS = conf.getOrElse("spark.dead.worker.persistence", "15").toInt
+ val RECOVERY_DIR = conf.getOrElse("spark.deploy.recoveryDirectory", "")
+ val RECOVERY_MODE = conf.getOrElse("spark.deploy.recoveryMode", "NONE")
var nextAppNumber = 0
val workers = new HashSet[WorkerInfo]
@@ -63,8 +63,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
Utils.checkHost(host, "Expected hostname")
- val masterMetricsSystem = MetricsSystem.createMetricsSystem("master")
- val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications")
+ val masterMetricsSystem = MetricsSystem.createMetricsSystem("master", conf)
+ val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications", conf)
val masterSource = new MasterSource(this)
val webUi = new MasterWebUI(this, webUiPort)
@@ -86,7 +86,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
// As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each app
// among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
- val spreadOutApps = System.getProperty("spark.deploy.spreadOut", "true").toBoolean
+ val spreadOutApps = conf.getOrElse("spark.deploy.spreadOut", "true").toBoolean
override def preStart() {
logInfo("Starting Spark master at " + masterUrl)
@@ -103,7 +103,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
persistenceEngine = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
- new ZooKeeperPersistenceEngine(SerializationExtension(context.system))
+ new ZooKeeperPersistenceEngine(SerializationExtension(context.system), conf)
case "FILESYSTEM" =>
logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
new FileSystemPersistenceEngine(RECOVERY_DIR, SerializationExtension(context.system))
@@ -113,7 +113,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
leaderElectionAgent = RECOVERY_MODE match {
case "ZOOKEEPER" =>
- context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl))
+ context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl, conf))
case _ =>
context.actorOf(Props(classOf[MonarchyLeaderAgent], self))
}
@@ -507,7 +507,7 @@ private[spark] object Master {
val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
def main(argStrings: Array[String]) {
- val args = new MasterArguments(argStrings)
+ val args = new MasterArguments(argStrings, SparkContext.globalConf)
val (actorSystem, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort)
actorSystem.awaitTermination()
}
@@ -523,9 +523,10 @@ private[spark] object Master {
}
def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = {
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,
+ conf = SparkContext.globalConf)
val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), actorName)
- val timeout = AkkaUtils.askTimeout
+ val timeout = AkkaUtils.askTimeout(SparkContext.globalConf)
val respFuture = actor.ask(RequestWebUIPort)(timeout)
val resp = Await.result(respFuture, timeout).asInstanceOf[WebUIPortResponse]
(actorSystem, boundPort, resp.webUIBoundPort)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
index 9d89b455fb..7ce83f9c36 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
@@ -18,11 +18,12 @@
package org.apache.spark.deploy.master
import org.apache.spark.util.{Utils, IntParam}
+import org.apache.spark.SparkConf
/**
* Command-line parser for the master.
*/
-private[spark] class MasterArguments(args: Array[String]) {
+private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
var host = Utils.localHostName()
var port = 7077
var webUiPort = 8080
@@ -37,8 +38,8 @@ private[spark] class MasterArguments(args: Array[String]) {
if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {
webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt
}
- if (System.getProperty("master.ui.port") != null) {
- webUiPort = System.getProperty("master.ui.port").toInt
+ if (conf.get("master.ui.port") != null) {
+ webUiPort = conf.get("master.ui.port").toInt
}
parse(args.toList)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
index 6cc7fd2ff4..79d95b1a83 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
@@ -23,7 +23,7 @@ import org.apache.zookeeper._
import org.apache.zookeeper.Watcher.Event.KeeperState
import org.apache.zookeeper.data.Stat
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
/**
* Provides a Scala-side interface to the standard ZooKeeper client, with the addition of retry
@@ -35,8 +35,9 @@ import org.apache.spark.Logging
* Additionally, all commands sent to ZooKeeper will be retried until they either fail too many
* times or a semantic exception is thrown (e.g., "node already exists").
*/
-private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging {
- val ZK_URL = System.getProperty("spark.deploy.zookeeper.url", "")
+private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher,
+ conf: SparkConf) extends Logging {
+ val ZK_URL = conf.getOrElse("spark.deploy.zookeeper.url", "")
val ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE
val ZK_TIMEOUT_MILLIS = 30000
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
index 7d535b08de..df5bb368a2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
@@ -21,16 +21,17 @@ import akka.actor.ActorRef
import org.apache.zookeeper._
import org.apache.zookeeper.Watcher.Event.EventType
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.deploy.master.MasterMessages._
-private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String)
+private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef,
+ masterUrl: String, conf: SparkConf)
extends LeaderElectionAgent with SparkZooKeeperWatcher with Logging {
- val WORKING_DIR = System.getProperty("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
+ val WORKING_DIR = conf.getOrElse("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
private val watcher = new ZooKeeperWatcher()
- private val zk = new SparkZooKeeperSession(this)
+ private val zk = new SparkZooKeeperSession(this, conf)
private var status = LeadershipStatus.NOT_LEADER
private var myLeaderFile: String = _
private var leaderUrl: String = _
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
index 825344b3bb..c55b720422 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -17,19 +17,19 @@
package org.apache.spark.deploy.master
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
import org.apache.zookeeper._
import akka.serialization.Serialization
-class ZooKeeperPersistenceEngine(serialization: Serialization)
+class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
extends PersistenceEngine
with SparkZooKeeperWatcher
with Logging
{
- val WORKING_DIR = System.getProperty("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
+ val WORKING_DIR = conf.getOrElse("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
- val zk = new SparkZooKeeperSession(this)
+ val zk = new SparkZooKeeperSession(this, conf)
zk.connect()
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index 9ab594b682..ead35662fc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -31,7 +31,7 @@ import org.apache.spark.util.{AkkaUtils, Utils}
*/
private[spark]
class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
- val timeout = AkkaUtils.askTimeout
+ val timeout = AkkaUtils.askTimeout(master.conf)
val host = Utils.localHostName()
val port = requestedPort
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 87531b6719..75a6e75c78 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -25,23 +25,14 @@ import scala.collection.mutable.HashMap
import scala.concurrent.duration._
import akka.actor._
-import akka.remote.{ DisassociatedEvent, RemotingLifecycleEvent}
-
-import org.apache.spark.{SparkException, Logging}
+import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
+import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.util.{Utils, AkkaUtils}
-import org.apache.spark.deploy.DeployMessages.WorkerStateResponse
-import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
-import org.apache.spark.deploy.DeployMessages.KillExecutor
-import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
-import org.apache.spark.deploy.DeployMessages.Heartbeat
-import org.apache.spark.deploy.DeployMessages.RegisteredWorker
-import org.apache.spark.deploy.DeployMessages.LaunchExecutor
-import org.apache.spark.deploy.DeployMessages.RegisterWorker
+import org.apache.spark.util.{AkkaUtils, Utils}
/**
* @param masterUrls Each url should look like spark://host:port.
@@ -53,7 +44,8 @@ private[spark] class Worker(
cores: Int,
memory: Int,
masterUrls: Array[String],
- workDirPath: String = null)
+ workDirPath: String = null,
+ val conf: SparkConf)
extends Actor with Logging {
import context.dispatcher
@@ -63,7 +55,7 @@ private[spark] class Worker(
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs
// Send a heartbeat every (heartbeat timeout) / 4 milliseconds
- val HEARTBEAT_MILLIS = System.getProperty("spark.worker.timeout", "60").toLong * 1000 / 4
+ val HEARTBEAT_MILLIS = conf.getOrElse("spark.worker.timeout", "60").toLong * 1000 / 4
val REGISTRATION_TIMEOUT = 20.seconds
val REGISTRATION_RETRIES = 3
@@ -92,7 +84,7 @@ private[spark] class Worker(
var coresUsed = 0
var memoryUsed = 0
- val metricsSystem = MetricsSystem.createMetricsSystem("worker")
+ val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf)
val workerSource = new WorkerSource(this)
def coresFree: Int = cores - coresUsed
@@ -275,6 +267,7 @@ private[spark] class Worker(
}
private[spark] object Worker {
+ import org.apache.spark.SparkContext.globalConf
def main(argStrings: Array[String]) {
val args = new WorkerArguments(argStrings)
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
@@ -287,9 +280,10 @@ private[spark] object Worker {
: (ActorSystem, Int) = {
// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,
+ conf = globalConf)
actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
- masterUrls, workDir), name = "Worker")
+ masterUrls, workDir, globalConf), name = "Worker")
(actorSystem, boundPort)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index 40d6bdb3fd..ec47ba1b56 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -22,7 +22,7 @@ import java.io.File
import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.{Handler, Server}
-import org.apache.spark.Logging
+import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.ui.{JettyUtils, UIUtils}
import org.apache.spark.ui.JettyUtils._
@@ -34,10 +34,10 @@ import org.apache.spark.util.{AkkaUtils, Utils}
private[spark]
class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None)
extends Logging {
- val timeout = AkkaUtils.askTimeout
+ val timeout = AkkaUtils.askTimeout(worker.conf)
val host = Utils.localHostName()
val port = requestedPort.getOrElse(
- System.getProperty("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt)
+ worker.conf.getOrElse("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt)
var server: Option[Server] = None
var boundPort: Option[Int] = None
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index debbdd4c44..c8319f6f6e 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
import akka.actor._
import akka.remote._
-import org.apache.spark.Logging
+import org.apache.spark.{SparkContext, Logging}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{Utils, AkkaUtils}
@@ -98,10 +98,10 @@ private[spark] object CoarseGrainedExecutorBackend {
// Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
// before getting started with all our system properties, etc
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
- indestructible = true)
+ indestructible = true, conf = SparkContext.globalConf)
// set it
val sparkHostPort = hostname + ":" + boundPort
- System.setProperty("spark.hostPort", sparkHostPort)
+// conf.set("spark.hostPort", sparkHostPort)
actorSystem.actorOf(
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores),
name = "Executor")
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 0f19d7a96b..70fc30e993 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -57,17 +57,17 @@ private[spark] class Executor(
// Make sure the local hostname we report matches the cluster scheduler's name for this host
Utils.setCustomHostname(slaveHostname)
-
+ val conf = new SparkConf(false)
// Set spark.* system properties from executor arg
for ((key, value) <- properties) {
- System.setProperty(key, value)
+ conf.set(key, value)
}
// If we are in yarn mode, systems can have different disk layouts so we must set it
// to what Yarn on this system said was available. This will be used later when SparkEnv
// created.
if (java.lang.Boolean.valueOf(System.getenv("SPARK_YARN_MODE"))) {
- System.setProperty("spark.local.dir", getYarnLocalDirs())
+ conf.set("spark.local.dir", getYarnLocalDirs())
}
// Create our ClassLoader and set it on this thread
@@ -108,7 +108,7 @@ private[spark] class Executor(
// Initialize Spark environment (using system properties read above)
private val env = {
if (!isLocal) {
- val _env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0,
+ val _env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, conf,
isDriver = false, isLocal = false)
SparkEnv.set(_env)
_env.metricsSystem.registerSource(executorSource)
@@ -303,7 +303,7 @@ private[spark] class Executor(
* new classes defined by the REPL as the user types code
*/
private def addReplClassLoaderIfNeeded(parent: ClassLoader): ClassLoader = {
- val classUri = System.getProperty("spark.repl.class.uri")
+ val classUri = conf.getOrElse("spark.repl.class.uri", null)
if (classUri != null) {
logInfo("Using REPL class URI: " + classUri)
try {
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 570a979b56..8ef5019b6c 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -22,6 +22,7 @@ import java.io.{InputStream, OutputStream}
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
+import org.apache.spark.SparkConf
/**
@@ -37,15 +38,16 @@ trait CompressionCodec {
private[spark] object CompressionCodec {
-
+ import org.apache.spark.SparkContext.globalConf
def createCodec(): CompressionCodec = {
createCodec(System.getProperty(
"spark.io.compression.codec", classOf[LZFCompressionCodec].getName))
}
def createCodec(codecName: String): CompressionCodec = {
- Class.forName(codecName, true, Thread.currentThread.getContextClassLoader)
- .newInstance().asInstanceOf[CompressionCodec]
+ val ctor = Class.forName(codecName, true, Thread.currentThread.getContextClassLoader)
+ .getConstructor(classOf[SparkConf])
+ ctor.newInstance(globalConf).asInstanceOf[CompressionCodec]
}
}
@@ -53,7 +55,7 @@ private[spark] object CompressionCodec {
/**
* LZF implementation of [[org.apache.spark.io.CompressionCodec]].
*/
-class LZFCompressionCodec extends CompressionCodec {
+class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec {
override def compressedOutputStream(s: OutputStream): OutputStream = {
new LZFOutputStream(s).setFinishBlockOnFlush(true)
@@ -67,10 +69,10 @@ class LZFCompressionCodec extends CompressionCodec {
* Snappy implementation of [[org.apache.spark.io.CompressionCodec]].
* Block size can be configured by spark.io.compression.snappy.block.size.
*/
-class SnappyCompressionCodec extends CompressionCodec {
+class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
override def compressedOutputStream(s: OutputStream): OutputStream = {
- val blockSize = System.getProperty("spark.io.compression.snappy.block.size", "32768").toInt
+ val blockSize = conf.getOrElse("spark.io.compression.snappy.block.size", "32768").toInt
new SnappyOutputStream(s, blockSize)
}
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index bec0c83be8..ac29816f19 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit
import scala.collection.mutable
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.metrics.sink.{MetricsServlet, Sink}
import org.apache.spark.metrics.source.Source
@@ -62,10 +62,11 @@ import org.apache.spark.metrics.source.Source
*
* [options] is the specific property of this source or sink.
*/
-private[spark] class MetricsSystem private (val instance: String) extends Logging {
+private[spark] class MetricsSystem private (val instance: String,
+ conf: SparkConf) extends Logging {
initLogging()
- val confFile = System.getProperty("spark.metrics.conf")
+ val confFile = conf.getOrElse("spark.metrics.conf", null)
val metricsConfig = new MetricsConfig(Option(confFile))
val sinks = new mutable.ArrayBuffer[Sink]
@@ -159,5 +160,6 @@ private[spark] object MetricsSystem {
}
}
- def createMetricsSystem(instance: String): MetricsSystem = new MetricsSystem(instance)
+ def createMetricsSystem(instance: String, conf: SparkConf): MetricsSystem =
+ new MetricsSystem(instance, conf)
}
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index 703bc6a9ca..3e902f8ac5 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -37,7 +37,7 @@ import scala.concurrent.duration._
import org.apache.spark.util.Utils
-private[spark] class ConnectionManager(port: Int) extends Logging {
+private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Logging {
class MessageStatus(
val message: Message,
@@ -54,22 +54,22 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
private val selector = SelectorProvider.provider.openSelector()
private val handleMessageExecutor = new ThreadPoolExecutor(
- System.getProperty("spark.core.connection.handler.threads.min","20").toInt,
- System.getProperty("spark.core.connection.handler.threads.max","60").toInt,
- System.getProperty("spark.core.connection.handler.threads.keepalive","60").toInt, TimeUnit.SECONDS,
+ conf.getOrElse("spark.core.connection.handler.threads.min", "20").toInt,
+ conf.getOrElse("spark.core.connection.handler.threads.max", "60").toInt,
+ conf.getOrElse("spark.core.connection.handler.threads.keepalive", "60").toInt, TimeUnit.SECONDS,
new LinkedBlockingDeque[Runnable]())
private val handleReadWriteExecutor = new ThreadPoolExecutor(
- System.getProperty("spark.core.connection.io.threads.min","4").toInt,
- System.getProperty("spark.core.connection.io.threads.max","32").toInt,
- System.getProperty("spark.core.connection.io.threads.keepalive","60").toInt, TimeUnit.SECONDS,
+ conf.getOrElse("spark.core.connection.io.threads.min", "4").toInt,
+ conf.getOrElse("spark.core.connection.io.threads.max", "32").toInt,
+ conf.getOrElse("spark.core.connection.io.threads.keepalive", "60").toInt, TimeUnit.SECONDS,
new LinkedBlockingDeque[Runnable]())
// Use a different, yet smaller, thread pool - infrequently used with very short lived tasks : which should be executed asap
private val handleConnectExecutor = new ThreadPoolExecutor(
- System.getProperty("spark.core.connection.connect.threads.min","1").toInt,
- System.getProperty("spark.core.connection.connect.threads.max","8").toInt,
- System.getProperty("spark.core.connection.connect.threads.keepalive","60").toInt, TimeUnit.SECONDS,
+ conf.getOrElse("spark.core.connection.connect.threads.min", "1").toInt,
+ conf.getOrElse("spark.core.connection.connect.threads.max", "8").toInt,
+ conf.getOrElse("spark.core.connection.connect.threads.keepalive", "60").toInt, TimeUnit.SECONDS,
new LinkedBlockingDeque[Runnable]())
private val serverChannel = ServerSocketChannel.open()
@@ -593,8 +593,10 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
private[spark] object ConnectionManager {
+ import SparkContext.globalConf
+
def main(args: Array[String]) {
- val manager = new ConnectionManager(9999)
+ val manager = new ConnectionManager(9999, globalConf)
manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
println("Received [" + msg + "] from [" + id + "]")
None
diff --git a/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala b/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
index 781715108b..4ca3cd390b 100644
--- a/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
@@ -21,9 +21,9 @@ import java.nio.ByteBuffer
import java.net.InetAddress
private[spark] object ReceiverTest {
-
+ import org.apache.spark.SparkContext.globalConf
def main(args: Array[String]) {
- val manager = new ConnectionManager(9999)
+ val manager = new ConnectionManager(9999, globalConf)
println("Started connection manager with id = " + manager.id)
manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
diff --git a/core/src/main/scala/org/apache/spark/network/SenderTest.scala b/core/src/main/scala/org/apache/spark/network/SenderTest.scala
index 777574980f..11c21fc1d5 100644
--- a/core/src/main/scala/org/apache/spark/network/SenderTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/SenderTest.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
import java.net.InetAddress
private[spark] object SenderTest {
-
+ import org.apache.spark.SparkContext.globalConf
def main(args: Array[String]) {
if (args.length < 2) {
@@ -33,7 +33,7 @@ private[spark] object SenderTest {
val targetPort = args(1).toInt
val targetConnectionManagerId = new ConnectionManagerId(targetHost, targetPort)
- val manager = new ConnectionManager(0)
+ val manager = new ConnectionManager(0, globalConf)
println("Started connection manager with id = " + manager.id)
manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
index b1e1576dad..81b3104afd 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
@@ -23,20 +23,20 @@ import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelHandlerContext
import io.netty.util.CharsetUtil
-import org.apache.spark.Logging
+import org.apache.spark.{SparkContext, SparkConf, Logging}
import org.apache.spark.network.ConnectionManagerId
import scala.collection.JavaConverters._
import org.apache.spark.storage.BlockId
-private[spark] class ShuffleCopier extends Logging {
+private[spark] class ShuffleCopier(conf: SparkConf) extends Logging {
def getBlock(host: String, port: Int, blockId: BlockId,
resultCollectCallback: (BlockId, Long, ByteBuf) => Unit) {
val handler = new ShuffleCopier.ShuffleClientHandler(resultCollectCallback)
- val connectTimeout = System.getProperty("spark.shuffle.netty.connect.timeout", "60000").toInt
+ val connectTimeout = conf.getOrElse("spark.shuffle.netty.connect.timeout", "60000").toInt
val fc = new FileClient(handler, connectTimeout)
try {
@@ -107,7 +107,7 @@ private[spark] object ShuffleCopier extends Logging {
val tasks = (for (i <- Range(0, threads)) yield {
Executors.callable(new Runnable() {
def run() {
- val copier = new ShuffleCopier()
+ val copier = new ShuffleCopier(SparkContext.globalConf)
copier.getBlock(host, port, blockId, echoResultCollectCallBack)
}
})
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index a712ef1c27..9fbe002748 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -75,6 +75,8 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
private[spark] object CheckpointRDD extends Logging {
+ import SparkContext.{globalConf => conf}
+
def splitIdToFile(splitId: Int): String = {
"part-%05d".format(splitId)
}
@@ -92,7 +94,7 @@ private[spark] object CheckpointRDD extends Logging {
throw new IOException("Checkpoint failed: temporary path " +
tempOutputPath + " already exists")
}
- val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
+ val bufferSize = conf.getOrElse("spark.buffer.size", "65536").toInt
val fileOutputStream = if (blockSize < 0) {
fs.create(tempOutputPath, false, bufferSize)
@@ -122,7 +124,7 @@ private[spark] object CheckpointRDD extends Logging {
def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = {
val env = SparkEnv.get
val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
- val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
+ val bufferSize = conf.getOrElse("spark.buffer.size", "65536").toInt
val fileInputStream = fs.open(path, bufferSize)
val serializer = env.serializer.newInstance()
val deserializeStream = serializer.deserializeStream(fileInputStream)
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index ea45566ad1..f8b1a6932e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -81,6 +81,7 @@ abstract class RDD[T: ClassTag](
def this(@transient oneParent: RDD[_]) =
this(oneParent.context , List(new OneToOneDependency(oneParent)))
+ private[spark] def conf = sc.conf
// =======================================================================
// Methods that should be implemented by subclasses of RDD
// =======================================================================
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 60927831a1..3f55cd5642 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -41,7 +41,7 @@ import org.apache.spark.storage.StorageLevel
class JobLogger(val user: String, val logDirName: String)
extends SparkListener with Logging {
- def this() = this(System.getProperty("user.name", "<unknown>"),
+ def this() = this(System.getProperty("user.name", "<unknown>"),
String.valueOf(System.currentTimeMillis()))
private val logDir =
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
index 356fe56bf3..9002d33cda 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
@@ -20,7 +20,7 @@ package org.apache.spark.scheduler
import java.io.{FileInputStream, InputStream}
import java.util.{NoSuchElementException, Properties}
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
import scala.xml.XML
@@ -49,10 +49,10 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
}
}
-private[spark] class FairSchedulableBuilder(val rootPool: Pool)
+private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
extends SchedulableBuilder with Logging {
- val schedulerAllocFile = Option(System.getProperty("spark.scheduler.allocation.file"))
+ val schedulerAllocFile = Option(conf.get("spark.scheduler.allocation.file"))
val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool"
val DEFAULT_POOL_NAME = "default"
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
index 66ab8ea4cd..7e231ec44c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@ -49,11 +49,12 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
extends TaskScheduler
with Logging
{
+ val conf = sc.conf
// How often to check for speculative tasks
- val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong
+ val SPECULATION_INTERVAL = conf.getOrElse("spark.speculation.interval", "100").toLong
// Threshold above which we warn user initial TaskSet may be starved
- val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "15000").toLong
+ val STARVATION_TIMEOUT = conf.getOrElse("spark.starvation.timeout", "15000").toLong
// ClusterTaskSetManagers are not thread safe, so any access to one should be synchronized
// on this class.
@@ -90,7 +91,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
var rootPool: Pool = null
// default scheduler is FIFO
val schedulingMode: SchedulingMode = SchedulingMode.withName(
- System.getProperty("spark.scheduler.mode", "FIFO"))
+ conf.getOrElse("spark.scheduler.mode", "FIFO"))
// This is a var so that we can reset it for testing purposes.
private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)
@@ -108,7 +109,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
- new FairSchedulableBuilder(rootPool)
+ new FairSchedulableBuilder(rootPool, conf)
}
}
schedulableBuilder.buildPools()
@@ -119,7 +120,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
override def start() {
backend.start()
- if (System.getProperty("spark.speculation", "false").toBoolean) {
+ if (conf.getOrElse("spark.speculation", "false").toBoolean) {
logInfo("Starting speculative execution thread")
import sc.env.actorSystem.dispatcher
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
index bf494aa64d..398b0cefbf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -50,15 +50,16 @@ private[spark] class ClusterTaskSetManager(
extends TaskSetManager
with Logging
{
+ val conf = sched.sc.conf
// CPUs to request per task
- val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toInt
+ val CPUS_PER_TASK = conf.getOrElse("spark.task.cpus", "1").toInt
// Maximum times a task is allowed to fail before failing the job
- val MAX_TASK_FAILURES = System.getProperty("spark.task.maxFailures", "4").toInt
+ val MAX_TASK_FAILURES = conf.getOrElse("spark.task.maxFailures", "4").toInt
// Quantile of tasks at which to start speculation
- val SPECULATION_QUANTILE = System.getProperty("spark.speculation.quantile", "0.75").toDouble
- val SPECULATION_MULTIPLIER = System.getProperty("spark.speculation.multiplier", "1.5").toDouble
+ val SPECULATION_QUANTILE = conf.getOrElse("spark.speculation.quantile", "0.75").toDouble
+ val SPECULATION_MULTIPLIER = conf.getOrElse("spark.speculation.multiplier", "1.5").toDouble
// Serializer for closures and tasks.
val env = SparkEnv.get
@@ -117,7 +118,7 @@ private[spark] class ClusterTaskSetManager(
// How frequently to reprint duplicate exceptions in full, in milliseconds
val EXCEPTION_PRINT_INTERVAL =
- System.getProperty("spark.logging.exceptionPrintInterval", "10000").toLong
+ conf.getOrElse("spark.logging.exceptionPrintInterval", "10000").toLong
// Map of recent exceptions (identified by string representation and top stack frame) to
// duplicate count (how many times the same exception has appeared) and time the full exception
@@ -677,14 +678,14 @@ private[spark] class ClusterTaskSetManager(
}
private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
- val defaultWait = System.getProperty("spark.locality.wait", "3000")
+ val defaultWait = conf.getOrElse("spark.locality.wait", "3000")
level match {
case TaskLocality.PROCESS_LOCAL =>
- System.getProperty("spark.locality.wait.process", defaultWait).toLong
+ conf.getOrElse("spark.locality.wait.process", defaultWait).toLong
case TaskLocality.NODE_LOCAL =>
- System.getProperty("spark.locality.wait.node", defaultWait).toLong
+ conf.getOrElse("spark.locality.wait.node", defaultWait).toLong
case TaskLocality.RACK_LOCAL =>
- System.getProperty("spark.locality.wait.rack", defaultWait).toLong
+ conf.getOrElse("spark.locality.wait.rack", defaultWait).toLong
case TaskLocality.ANY =>
0L
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 7e22c843bf..40555903ac 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Await
import scala.concurrent.duration._
+import scala.util.Try
import akka.actor._
import akka.pattern.ask
@@ -46,8 +47,8 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
{
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
var totalCoreCount = new AtomicInteger(0)
-
- private val timeout = AkkaUtils.askTimeout
+ val conf = scheduler.sc.conf
+ private val timeout = AkkaUtils.askTimeout(conf)
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
private val executorActor = new HashMap[String, ActorRef]
@@ -61,7 +62,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
// Periodically revive offers to allow delay scheduling to work
- val reviveInterval = System.getProperty("spark.scheduler.revive.interval", "1000").toLong
+ val reviveInterval = conf.getOrElse("spark.scheduler.revive.interval", "1000").toLong
import context.dispatcher
context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers)
}
@@ -162,7 +163,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
override def start() {
val properties = new ArrayBuffer[(String, String)]
- val iterator = System.getProperties.entrySet.iterator
+ val iterator = scheduler.sc.conf.getAllConfiguration
while (iterator.hasNext) {
val entry = iterator.next
val (key, value) = (entry.getKey.toString, entry.getValue.toString)
@@ -170,6 +171,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
properties += ((key, value))
}
}
+ //TODO (prashant) send conf instead of properties
driverActor = actorSystem.actorOf(
Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
}
@@ -208,7 +210,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
driverActor ! KillTask(taskId, executorId)
}
- override def defaultParallelism() = Option(System.getProperty("spark.default.parallelism"))
+ override def defaultParallelism() = Try(conf.get("spark.default.parallelism")).toOption
.map(_.toInt).getOrElse(math.max(totalCoreCount.get(), 2))
// Called by subclasses when notified of a lost worker
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index e8fecec4a6..d01329b2b3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -31,13 +31,13 @@ private[spark] class SimrSchedulerBackend(
val tmpPath = new Path(driverFilePath + "_tmp")
val filePath = new Path(driverFilePath)
- val maxCores = System.getProperty("spark.simr.executor.cores", "1").toInt
+ val maxCores = conf.getOrElse("spark.simr.executor.cores", "1").toInt
override def start() {
super.start()
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
- System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
+ sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val conf = new Configuration()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 7127a72d6d..d6b8ac2d57 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -36,14 +36,14 @@ private[spark] class SparkDeploySchedulerBackend(
var stopping = false
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
- val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
+ val maxCores = conf.getOrElse("spark.cores.max", Int.MaxValue.toString).toInt
override def start() {
super.start()
// The endpoint for executors to talk to us
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
- System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
+ conf.get("spark.driver.host"), conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
val command = Command(
@@ -52,7 +52,7 @@ private[spark] class SparkDeploySchedulerBackend(
val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome,
"http://" + sc.ui.appUIAddress)
- client = new Client(sc.env.actorSystem, masters, appDesc, this)
+ client = new Client(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala
index e68c527713..ff6cc37f1d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala
@@ -31,7 +31,8 @@ import org.apache.spark.util.Utils
*/
private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler)
extends Logging {
- private val THREADS = System.getProperty("spark.resultGetter.threads", "4").toInt
+
+ private val THREADS = sparkEnv.conf.getOrElse("spark.resultGetter.threads", "4").toInt
private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool(
THREADS, "Result resolver thread")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 84fe3094cc..2a3b0e15f7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -61,7 +61,7 @@ private[spark] class CoarseMesosSchedulerBackend(
var driver: SchedulerDriver = null
// Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
- val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
+ val maxCores = conf.getOrElse("spark.cores.max", Int.MaxValue.toString).toInt
// Cores we have acquired with each Mesos task ID
val coresByTaskId = new HashMap[Int, Int]
@@ -76,7 +76,7 @@ private[spark] class CoarseMesosSchedulerBackend(
"Spark home is not set; set it through the spark.home system " +
"property, the SPARK_HOME environment variable or the SparkContext constructor"))
- val extraCoresPerSlave = System.getProperty("spark.mesos.extra.cores", "0").toInt
+ val extraCoresPerSlave = conf.getOrElse("spark.mesos.extra.cores", "0").toInt
var nextMesosTaskId = 0
@@ -121,10 +121,10 @@ private[spark] class CoarseMesosSchedulerBackend(
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
- System.getProperty("spark.driver.host"),
- System.getProperty("spark.driver.port"),
+ conf.get("spark.driver.host"),
+ conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
- val uri = System.getProperty("spark.executor.uri")
+ val uri = conf.get("spark.executor.uri")
if (uri == null) {
val runScript = new File(sparkHome, "spark-class").getCanonicalPath
command.setValue(
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 50cbc2ca92..9bb92b4f01 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -101,7 +101,7 @@ private[spark] class MesosSchedulerBackend(
}
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
- val uri = System.getProperty("spark.executor.uri")
+ val uri = sc.conf.get("spark.executor.uri")
if (uri == null) {
command.setValue(new File(sparkHome, "spark-executor").getCanonicalPath)
} else {
@@ -341,5 +341,5 @@ private[spark] class MesosSchedulerBackend(
}
// TODO: query Mesos for number of cores
- override def defaultParallelism() = System.getProperty("spark.default.parallelism", "8").toInt
+ override def defaultParallelism() = sc.conf.getOrElse("spark.default.parallelism", "8").toInt
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
index 01e95162c0..6069c1db3a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
@@ -80,6 +80,7 @@ private[spark] class LocalScheduler(val threads: Int, val maxFailures: Int, val
with Logging {
val env = SparkEnv.get
+ val conf = env.conf
val attemptId = new AtomicInteger
var dagScheduler: DAGScheduler = null
@@ -91,7 +92,7 @@ private[spark] class LocalScheduler(val threads: Int, val maxFailures: Int, val
var schedulableBuilder: SchedulableBuilder = null
var rootPool: Pool = null
val schedulingMode: SchedulingMode = SchedulingMode.withName(
- System.getProperty("spark.scheduler.mode", "FIFO"))
+ conf.getOrElse("spark.scheduler.mode", "FIFO"))
val activeTaskSets = new HashMap[String, LocalTaskSetManager]
val taskIdToTaskSetId = new HashMap[Long, String]
val taskSetTaskIds = new HashMap[String, HashSet[Long]]
@@ -106,7 +107,7 @@ private[spark] class LocalScheduler(val threads: Int, val maxFailures: Int, val
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
- new FairSchedulableBuilder(rootPool)
+ new FairSchedulableBuilder(rootPool, conf)
}
}
schedulableBuilder.buildPools()
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index e748c2275d..17cec81038 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -25,18 +25,20 @@ import com.esotericsoftware.kryo.{KryoException, Kryo}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import com.twitter.chill.{EmptyScalaKryoInstantiator, AllScalaRegistrar}
-import org.apache.spark.{SerializableWritable, Logging}
+import org.apache.spark.{SparkContext, SparkConf, SerializableWritable, Logging}
import org.apache.spark.broadcast.HttpBroadcast
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage._
+import scala.util.Try
/**
* A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]].
*/
class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging {
+ private val conf = SparkContext.globalConf
private val bufferSize = {
- System.getProperty("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024
+ conf.getOrElse("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024
}
def newKryoOutput() = new KryoOutput(bufferSize)
@@ -48,7 +50,7 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging
// Allow disabling Kryo reference tracking if user knows their object graphs don't have loops.
// Do this before we invoke the user registrator so the user registrator can override this.
- kryo.setReferences(System.getProperty("spark.kryo.referenceTracking", "true").toBoolean)
+ kryo.setReferences(conf.getOrElse("spark.kryo.referenceTracking", "true").toBoolean)
for (cls <- KryoSerializer.toRegister) kryo.register(cls)
@@ -58,7 +60,7 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging
// Allow the user to register their own classes by setting spark.kryo.registrator
try {
- Option(System.getProperty("spark.kryo.registrator")).foreach { regCls =>
+ Try(conf.get("spark.kryo.registrator")).toOption.foreach { regCls =>
logDebug("Running user registrator: " + regCls)
val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]
reg.registerClasses(kryo)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
index e51c5b30a3..ee2ae471a9 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
@@ -312,7 +312,7 @@ object BlockFetcherIterator {
logDebug("Sending request for %d blocks (%s) from %s".format(
req.blocks.size, Utils.bytesToString(req.size), req.address.host))
val cmId = new ConnectionManagerId(req.address.host, req.address.nettyPort)
- val cpier = new ShuffleCopier
+ val cpier = new ShuffleCopier(blockManager.conf)
cpier.getBlocks(cmId, req.blocks, putResult)
logDebug("Sent request for remote blocks " + req.blocks + " from " + req.address.host )
}
@@ -327,7 +327,7 @@ object BlockFetcherIterator {
fetchRequestsSync.put(request)
}
- copiers = startCopiers(System.getProperty("spark.shuffle.copier.threads", "6").toInt)
+ copiers = startCopiers(conf.getOrElse("spark.shuffle.copier.threads", "6").toInt)
logInfo("Started " + fetchRequestsSync.size + " remote gets in " +
Utils.getUsedTimeMs(startTime))
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 19a025a329..ffd166e93a 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -30,7 +30,7 @@ import scala.concurrent.duration._
import it.unimi.dsi.fastutil.io.{FastBufferedOutputStream, FastByteArrayOutputStream}
-import org.apache.spark.{Logging, SparkEnv, SparkException}
+import org.apache.spark.{SparkConf, Logging, SparkEnv, SparkException}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.network._
import org.apache.spark.serializer.Serializer
@@ -43,12 +43,13 @@ private[spark] class BlockManager(
actorSystem: ActorSystem,
val master: BlockManagerMaster,
val defaultSerializer: Serializer,
- maxMemory: Long)
+ maxMemory: Long,
+ val conf: SparkConf)
extends Logging {
val shuffleBlockManager = new ShuffleBlockManager(this)
val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
- System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
+ conf.getOrElse("spark.local.dir", System.getProperty("java.io.tmpdir")))
private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
@@ -57,12 +58,12 @@ private[spark] class BlockManager(
// If we use Netty for shuffle, start a new Netty-based shuffle sender service.
private val nettyPort: Int = {
- val useNetty = System.getProperty("spark.shuffle.use.netty", "false").toBoolean
- val nettyPortConfig = System.getProperty("spark.shuffle.sender.port", "0").toInt
+ val useNetty = conf.getOrElse("spark.shuffle.use.netty", "false").toBoolean
+ val nettyPortConfig = conf.getOrElse("spark.shuffle.sender.port", "0").toInt
if (useNetty) diskBlockManager.startShuffleBlockSender(nettyPortConfig) else 0
}
- val connectionManager = new ConnectionManager(0)
+ val connectionManager = new ConnectionManager(0, conf)
implicit val futureExecContext = connectionManager.futureExecContext
val blockManagerId = BlockManagerId(
@@ -71,14 +72,14 @@ private[spark] class BlockManager(
// Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory
// for receiving shuffle outputs)
val maxBytesInFlight =
- System.getProperty("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024
+ conf.getOrElse("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024
// Whether to compress broadcast variables that are stored
- val compressBroadcast = System.getProperty("spark.broadcast.compress", "true").toBoolean
+ val compressBroadcast = conf.getOrElse("spark.broadcast.compress", "true").toBoolean
// Whether to compress shuffle output that are stored
- val compressShuffle = System.getProperty("spark.shuffle.compress", "true").toBoolean
+ val compressShuffle = conf.getOrElse("spark.shuffle.compress", "true").toBoolean
// Whether to compress RDD partitions that are stored serialized
- val compressRdds = System.getProperty("spark.rdd.compress", "false").toBoolean
+ val compressRdds = conf.getOrElse("spark.rdd.compress", "false").toBoolean
val heartBeatFrequency = BlockManager.getHeartBeatFrequencyFromSystemProperties
@@ -115,8 +116,8 @@ private[spark] class BlockManager(
* Construct a BlockManager with a memory limit set based on system properties.
*/
def this(execId: String, actorSystem: ActorSystem, master: BlockManagerMaster,
- serializer: Serializer) = {
- this(execId, actorSystem, master, serializer, BlockManager.getMaxMemoryFromSystemProperties)
+ serializer: Serializer, conf: SparkConf) = {
+ this(execId, actorSystem, master, serializer, BlockManager.getMaxMemoryFromSystemProperties, conf)
}
/**
@@ -439,7 +440,7 @@ private[spark] class BlockManager(
: BlockFetcherIterator = {
val iter =
- if (System.getProperty("spark.shuffle.use.netty", "false").toBoolean) {
+ if (conf.getOrElse("spark.shuffle.use.netty", "false").toBoolean) {
new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer)
} else {
new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer)
@@ -465,7 +466,8 @@ private[spark] class BlockManager(
def getDiskWriter(blockId: BlockId, file: File, serializer: Serializer, bufferSize: Int)
: BlockObjectWriter = {
val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
- new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream)
+ val syncWrites = conf.getOrElse("spark.shuffle.sync", "false").toBoolean
+ new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream, syncWrites)
}
/**
@@ -856,19 +858,19 @@ private[spark] class BlockManager(
private[spark] object BlockManager extends Logging {
-
+ import org.apache.spark.SparkContext.{globalConf => conf}
val ID_GENERATOR = new IdGenerator
def getMaxMemoryFromSystemProperties: Long = {
- val memoryFraction = System.getProperty("spark.storage.memoryFraction", "0.66").toDouble
+ val memoryFraction = conf.getOrElse("spark.storage.memoryFraction", "0.66").toDouble
(Runtime.getRuntime.maxMemory * memoryFraction).toLong
}
def getHeartBeatFrequencyFromSystemProperties: Long =
- System.getProperty("spark.storage.blockManagerTimeoutIntervalMs", "60000").toLong / 4
+ conf.getOrElse("spark.storage.blockManagerTimeoutIntervalMs", "60000").toLong / 4
def getDisableHeartBeatsForTesting: Boolean =
- System.getProperty("spark.test.disableBlockManagerHeartBeat", "false").toBoolean
+ conf.getOrElse("spark.test.disableBlockManagerHeartBeat", "false").toBoolean
/**
* Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index e1d68ef592..fde7d63a68 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -23,19 +23,20 @@ import scala.concurrent.ExecutionContext.Implicits.global
import akka.actor._
import akka.pattern.ask
-import org.apache.spark.{Logging, SparkException}
+import org.apache.spark.{SparkConf, Logging, SparkException}
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.AkkaUtils
private[spark]
-class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection]) extends Logging {
+class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection],
+ conf: SparkConf) extends Logging {
- val AKKA_RETRY_ATTEMPTS: Int = System.getProperty("spark.akka.num.retries", "3").toInt
- val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt
+ val AKKA_RETRY_ATTEMPTS: Int = conf.getOrElse("spark.akka.num.retries", "3").toInt
+ val AKKA_RETRY_INTERVAL_MS: Int = conf.getOrElse("spark.akka.retry.wait", "3000").toInt
val DRIVER_AKKA_ACTOR_NAME = "BlockManagerMaster"
- val timeout = AkkaUtils.askTimeout
+ val timeout = AkkaUtils.askTimeout(conf)
/** Remove a dead executor from the driver actor. This is only called on the driver side. */
def removeExecutor(execId: String) {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 21022e1cfb..05502e4451 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -27,7 +27,7 @@ import scala.concurrent.duration._
import akka.actor.{Actor, ActorRef, Cancellable}
import akka.pattern.ask
-import org.apache.spark.{Logging, SparkException}
+import org.apache.spark.{SparkConf, Logging, SparkException}
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.{AkkaUtils, Utils}
@@ -36,7 +36,7 @@ import org.apache.spark.util.{AkkaUtils, Utils}
* all slaves' block managers.
*/
private[spark]
-class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
+class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Actor with Logging {
// Mapping from block manager id to the block manager's information.
private val blockManagerInfo =
@@ -48,14 +48,14 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
// Mapping from block id to the set of block managers that have the block.
private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
- private val akkaTimeout = AkkaUtils.askTimeout
+ private val akkaTimeout = AkkaUtils.askTimeout(conf)
initLogging()
- val slaveTimeout = System.getProperty("spark.storage.blockManagerSlaveTimeoutMs",
+ val slaveTimeout = conf.getOrElse("spark.storage.blockManagerSlaveTimeoutMs",
"" + (BlockManager.getHeartBeatFrequencyFromSystemProperties * 3)).toLong
- val checkTimeoutInterval = System.getProperty("spark.storage.blockManagerTimeoutIntervalMs",
+ val checkTimeoutInterval = conf.getOrElse("spark.storage.blockManagerTimeoutIntervalMs",
"60000").toLong
var timeoutCheckingTask: Cancellable = null
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index b4451fc7b8..61e63c60d5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -74,7 +74,8 @@ class DiskBlockObjectWriter(
file: File,
serializer: Serializer,
bufferSize: Int,
- compressStream: OutputStream => OutputStream)
+ compressStream: OutputStream => OutputStream,
+ syncWrites: Boolean)
extends BlockObjectWriter(blockId)
with Logging
{
@@ -97,8 +98,6 @@ class DiskBlockObjectWriter(
override def flush() = out.flush()
}
- private val syncWrites = System.getProperty("spark.shuffle.sync", "false").toBoolean
-
/** The file channel, used for repositioning / truncating the file. */
private var channel: FileChannel = null
private var bs: OutputStream = null
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index fcd2e97982..8f528babd4 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -38,7 +38,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
extends PathResolver with Logging {
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
- private val subDirsPerLocalDir = System.getProperty("spark.diskStore.subDirectories", "64").toInt
+ private val subDirsPerLocalDir = shuffleManager.conf.getOrElse("spark.diskStore.subDirectories", "64").toInt
// Create one local directory for each path mentioned in spark.local.dir; then, inside this
// directory, create multiple subdirectories that we will hash files into, in order to avoid
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index e828e1d1c5..850d3178dd 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -27,6 +27,8 @@ import org.apache.spark.serializer.Serializer
import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap}
import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
import org.apache.spark.storage.ShuffleBlockManager.ShuffleFileGroup
+import scala.util.Try
+import org.apache.spark.SparkConf
/** A group of writers for a ShuffleMapTask, one writer per reducer. */
private[spark] trait ShuffleWriterGroup {
@@ -59,12 +61,13 @@ private[spark] trait ShuffleWriterGroup {
*/
private[spark]
class ShuffleBlockManager(blockManager: BlockManager) {
+ def conf = blockManager.conf
// Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
// TODO: Remove this once the shuffle file consolidation feature is stable.
val consolidateShuffleFiles =
- System.getProperty("spark.shuffle.consolidateFiles", "false").toBoolean
+ conf.getOrElse("spark.shuffle.consolidateFiles", "false").toBoolean
- private val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024
+ private val bufferSize = conf.getOrElse("spark.shuffle.file.buffer.kb", "100").toInt * 1024
/**
* Contains all the state related to a particular shuffle. This includes a pool of unused
diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
index a8db37ded1..b3b3893393 100644
--- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
@@ -22,6 +22,7 @@ import akka.actor._
import java.util.concurrent.ArrayBlockingQueue
import util.Random
import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.SparkContext
/**
* This class tests the BlockManager and MemoryStore for thread safety and
@@ -91,11 +92,12 @@ private[spark] object ThreadingTest {
def main(args: Array[String]) {
System.setProperty("spark.kryoserializer.buffer.mb", "1")
val actorSystem = ActorSystem("test")
+ val conf = SparkContext.globalConf
val serializer = new KryoSerializer
val blockManagerMaster = new BlockManagerMaster(
- Left(actorSystem.actorOf(Props(new BlockManagerMasterActor(true)))))
+ Left(actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf)))), conf)
val blockManager = new BlockManager(
- "<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024)
+ "<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf)
val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))
val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue))
producers.foreach(_.start)
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index f1d86c0221..0ce8d9c8c4 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -32,7 +32,7 @@ import org.apache.spark.util.Utils
/** Top level user interface for Spark */
private[spark] class SparkUI(sc: SparkContext) extends Logging {
val host = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(Utils.localHostName())
- val port = Option(System.getProperty("spark.ui.port")).getOrElse(SparkUI.DEFAULT_PORT).toInt
+ val port = sc.conf.getOrElse("spark.ui.port", SparkUI.DEFAULT_PORT).toInt
var boundPort: Option[Int] = None
var server: Option[Server] = None
diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
index fcd1b518d0..14751e8e8e 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
@@ -30,6 +30,8 @@ import org.apache.spark.scheduler.SchedulingMode
* Usage: ./run spark.ui.UIWorkloadGenerator [master]
*/
private[spark] object UIWorkloadGenerator {
+
+ import SparkContext.{globalConf => conf}
val NUM_PARTITIONS = 100
val INTER_JOB_WAIT_MS = 5000
@@ -43,7 +45,7 @@ private[spark] object UIWorkloadGenerator {
val appName = "Spark UI Tester"
if (schedulingMode == SchedulingMode.FAIR) {
- System.setProperty("spark.scheduler.mode", "FAIR")
+ conf.set("spark.scheduler.mode", "FAIR")
}
val sc = new SparkContext(master, appName)
diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
index c5bf2acc9e..b637d37517 100644
--- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
@@ -63,7 +63,7 @@ private[spark] class EnvironmentUI(sc: SparkContext) {
UIUtils.listingTable(propertyHeaders, propertyRow, otherProperties, fixedWidth = true)
val classPathEntries = classPathProperty._2
- .split(System.getProperty("path.separator", ":"))
+ .split(sc.conf.getOrElse("path.separator", ":"))
.filterNot(e => e.isEmpty)
.map(e => (e, "System Classpath"))
val addedJars = sc.addedJars.iterator.toSeq.map{case (path, time) => (path, "Added By User")}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 6b854740d6..f01a1380b9 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -33,7 +33,7 @@ import org.apache.spark.scheduler._
*/
private[spark] class JobProgressListener(val sc: SparkContext) extends SparkListener {
// How many stages to remember
- val RETAINED_STAGES = System.getProperty("spark.ui.retained_stages", "1000").toInt
+ val RETAINED_STAGES = sc.conf.getOrElse("spark.ui.retained_stages", "1000").toInt
val DEFAULT_POOL_NAME = "default"
val stageIdToPool = new HashMap[Int, String]()
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 1c8b51b8bc..76febd5702 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -21,6 +21,7 @@ import scala.concurrent.duration.{Duration, FiniteDuration}
import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem}
import com.typesafe.config.ConfigFactory
+import org.apache.spark.SparkConf
/**
* Various utility classes for working with Akka.
@@ -37,22 +38,22 @@ private[spark] object AkkaUtils {
* If indestructible is set to true, the Actor System will continue running in the event
* of a fatal exception. This is used by [[org.apache.spark.executor.Executor]].
*/
- def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false)
- : (ActorSystem, Int) = {
+ def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false,
+ conf: SparkConf): (ActorSystem, Int) = {
- val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt
- val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt
+ val akkaThreads = conf.getOrElse("spark.akka.threads", "4").toInt
+ val akkaBatchSize = conf.getOrElse("spark.akka.batchSize", "15").toInt
- val akkaTimeout = System.getProperty("spark.akka.timeout", "100").toInt
+ val akkaTimeout = conf.getOrElse("spark.akka.timeout", "100").toInt
- val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt
+ val akkaFrameSize = conf.getOrElse("spark.akka.frameSize", "10").toInt
val lifecycleEvents =
- if (System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
+ if (conf.getOrElse("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
- val akkaHeartBeatPauses = System.getProperty("spark.akka.heartbeat.pauses", "600").toInt
+ val akkaHeartBeatPauses = conf.getOrElse("spark.akka.heartbeat.pauses", "600").toInt
val akkaFailureDetector =
- System.getProperty("spark.akka.failure-detector.threshold", "300.0").toDouble
- val akkaHeartBeatInterval = System.getProperty("spark.akka.heartbeat.interval", "1000").toInt
+ conf.getOrElse("spark.akka.failure-detector.threshold", "300.0").toDouble
+ val akkaHeartBeatInterval = conf.getOrElse("spark.akka.heartbeat.interval", "1000").toInt
val akkaConf = ConfigFactory.parseString(
s"""
@@ -87,7 +88,7 @@ private[spark] object AkkaUtils {
}
/** Returns the default Spark timeout to use for Akka ask operations. */
- def askTimeout: FiniteDuration = {
- Duration.create(System.getProperty("spark.akka.askTimeout", "30").toLong, "seconds")
+ def askTimeout(conf: SparkConf): FiniteDuration = {
+ Duration.create(conf.getOrElse("spark.akka.askTimeout", "30").toLong, "seconds")
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
index 7b41ef89f1..bf71d17a21 100644
--- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
@@ -18,7 +18,7 @@
package org.apache.spark.util
import java.util.{TimerTask, Timer}
-import org.apache.spark.Logging
+import org.apache.spark.{SparkContext, Logging}
/**
@@ -66,21 +66,21 @@ object MetadataCleanerType extends Enumeration {
}
object MetadataCleaner {
-
+ private val conf = SparkContext.globalConf
// using only sys props for now : so that workers can also get to it while preserving earlier behavior.
- def getDelaySeconds = System.getProperty("spark.cleaner.ttl", "-1").toInt
+ def getDelaySeconds = conf.getOrElse("spark.cleaner.ttl", "3500").toInt //TODO: this is to fix tests for time being
def getDelaySeconds(cleanerType: MetadataCleanerType.MetadataCleanerType): Int = {
- System.getProperty(MetadataCleanerType.systemProperty(cleanerType), getDelaySeconds.toString).toInt
+ conf.getOrElse(MetadataCleanerType.systemProperty(cleanerType), getDelaySeconds.toString).toInt
}
def setDelaySeconds(cleanerType: MetadataCleanerType.MetadataCleanerType, delay: Int) {
- System.setProperty(MetadataCleanerType.systemProperty(cleanerType), delay.toString)
+ conf.set(MetadataCleanerType.systemProperty(cleanerType), delay.toString)
}
def setDelaySeconds(delay: Int, resetAll: Boolean = true) {
// override for all ?
- System.setProperty("spark.cleaner.ttl", delay.toString)
+ conf.set("spark.cleaner.ttl", delay.toString)
if (resetAll) {
for (cleanerType <- MetadataCleanerType.values) {
System.clearProperty(MetadataCleanerType.systemProperty(cleanerType))
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index a25b37a2a9..1407c39bfb 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -30,7 +30,7 @@ import java.lang.management.ManagementFactory
import scala.collection.mutable.ArrayBuffer
import it.unimi.dsi.fastutil.ints.IntOpenHashSet
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, SparkContext, Logging}
/**
* Estimates the sizes of Java objects (number of bytes of memory they occupy), for use in
@@ -41,6 +41,7 @@ import org.apache.spark.Logging
*/
private[spark] object SizeEstimator extends Logging {
+ private def conf = SparkContext.globalConf
// Sizes of primitive types
private val BYTE_SIZE = 1
private val BOOLEAN_SIZE = 1
@@ -90,8 +91,8 @@ private[spark] object SizeEstimator extends Logging {
}
private def getIsCompressedOops : Boolean = {
- if (System.getProperty("spark.test.useCompressedOops") != null) {
- return System.getProperty("spark.test.useCompressedOops").toBoolean
+ if (conf.getOrElse("spark.test.useCompressedOops", null) != null) {
+ return conf.get("spark.test.useCompressedOops").toBoolean
}
try {
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 3f7858d2de..fd5888e525 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -36,7 +36,7 @@ import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
import org.apache.spark.deploy.SparkHadoopUtil
import java.nio.ByteBuffer
-import org.apache.spark.{SparkException, Logging}
+import org.apache.spark.{SparkContext, SparkException, Logging}
/**
@@ -44,6 +44,7 @@ import org.apache.spark.{SparkException, Logging}
*/
private[spark] object Utils extends Logging {
+ private lazy val conf = SparkContext.globalConf
/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()
@@ -312,7 +313,7 @@ private[spark] object Utils extends Logging {
* multiple paths.
*/
def getLocalDir: String = {
- System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")).split(',')(0)
+ conf.getOrElse("spark.local.dir", System.getProperty("java.io.tmpdir")).split(',')(0)
}
/**
@@ -398,7 +399,7 @@ private[spark] object Utils extends Logging {
}
def localHostPort(): String = {
- val retval = System.getProperty("spark.hostPort", null)
+ val retval = conf.getOrElse("spark.hostPort", null)
if (retval == null) {
logErrorWithStack("spark.hostPort not set but invoking localHostPort")
return localHostName()
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 271dc905bc..10b8b441fd 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.AkkaUtils
class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
-
+ private val conf = new SparkConf
test("compressSize") {
assert(MapOutputTracker.compressSize(0L) === 0)
assert(MapOutputTracker.compressSize(1L) === 1)
@@ -48,14 +48,14 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
test("master start and stop") {
val actorSystem = ActorSystem("test")
- val tracker = new MapOutputTrackerMaster()
+ val tracker = new MapOutputTrackerMaster(conf)
tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))))
tracker.stop()
}
test("master register and fetch") {
val actorSystem = ActorSystem("test")
- val tracker = new MapOutputTrackerMaster()
+ val tracker = new MapOutputTrackerMaster(conf)
tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))))
tracker.registerShuffle(10, 2)
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
@@ -74,7 +74,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
test("master register and unregister and fetch") {
val actorSystem = ActorSystem("test")
- val tracker = new MapOutputTrackerMaster()
+ val tracker = new MapOutputTrackerMaster(conf)
tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))))
tracker.registerShuffle(10, 2)
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
@@ -96,16 +96,16 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
test("remote fetch") {
val hostname = "localhost"
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0)
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf)
System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext
System.setProperty("spark.hostPort", hostname + ":" + boundPort)
- val masterTracker = new MapOutputTrackerMaster()
+ val masterTracker = new MapOutputTrackerMaster(conf)
masterTracker.trackerActor = Left(actorSystem.actorOf(
Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker"))
- val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0)
- val slaveTracker = new MapOutputTracker()
+ val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, conf = conf)
+ val slaveTracker = new MapOutputTracker(conf)
slaveTracker.trackerActor = Right(slaveSystem.actorSelection(
"akka.tcp://spark@localhost:" + boundPort + "/user/MapOutputTracker"))
diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
index 7181333adf..4ecdde0001 100644
--- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
@@ -19,17 +19,19 @@ package org.apache.spark.metrics
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark.deploy.master.MasterSource
+import org.apache.spark.SparkConf
class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
var filePath: String = _
-
+ var conf: SparkConf = null
before {
filePath = getClass.getClassLoader.getResource("test_metrics_system.properties").getFile()
System.setProperty("spark.metrics.conf", filePath)
+ conf = new SparkConf
}
test("MetricsSystem with default config") {
- val metricsSystem = MetricsSystem.createMetricsSystem("default")
+ val metricsSystem = MetricsSystem.createMetricsSystem("default", conf)
val sources = metricsSystem.sources
val sinks = metricsSystem.sinks
@@ -39,7 +41,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
}
test("MetricsSystem with sources add") {
- val metricsSystem = MetricsSystem.createMetricsSystem("test")
+ val metricsSystem = MetricsSystem.createMetricsSystem("test", conf)
val sources = metricsSystem.sources
val sinks = metricsSystem.sinks
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 706d84a58b..2aa259daf3 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -17,21 +17,14 @@
package org.apache.spark.scheduler
-import scala.collection.mutable.{Map, HashMap}
-
-import org.scalatest.FunSuite
-import org.scalatest.BeforeAndAfter
-
-import org.apache.spark.LocalSparkContext
-import org.apache.spark.MapOutputTrackerMaster
-import org.apache.spark.SparkContext
-import org.apache.spark.Partition
-import org.apache.spark.TaskContext
-import org.apache.spark.{Dependency, ShuffleDependency, OneToOneDependency}
-import org.apache.spark.{FetchFailed, Success, TaskEndReason}
+import scala.Tuple2
+import scala.collection.mutable.{HashMap, Map}
+
+import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
+import org.scalatest.{BeforeAndAfter, FunSuite}
/**
* Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler
@@ -46,7 +39,7 @@ import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
* and capturing the resulting TaskSets from the mock TaskScheduler.
*/
class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
-
+ val conf = new SparkConf
/** Set of TaskSets the DAGScheduler has requested executed. */
val taskSets = scala.collection.mutable.Buffer[TaskSet]()
val taskScheduler = new TaskScheduler() {
@@ -74,7 +67,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
*/
val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]]
// stub out BlockManagerMaster.getLocations to use our cacheLocations
- val blockManagerMaster = new BlockManagerMaster(null) {
+ val blockManagerMaster = new BlockManagerMaster(null, conf) {
override def getLocations(blockIds: Array[BlockId]): Seq[Seq[BlockManagerId]] = {
blockIds.map {
_.asRDDId.map(id => (id.rddId -> id.splitIndex)).flatMap(key => cacheLocations.get(key)).
@@ -99,7 +92,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
taskSets.clear()
cacheLocations.clear()
results.clear()
- mapOutputTracker = new MapOutputTrackerMaster()
+ mapOutputTracker = new MapOutputTrackerMaster(conf)
scheduler = new DAGScheduler(taskScheduler, mapOutputTracker, blockManagerMaster, sc.env) {
override def runLocally(job: ActiveJob) {
// don't bother with the thread while unit testing
diff --git a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
index 002368ff55..dd122615ad 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
@@ -95,7 +95,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
- val user = System.getProperty("user.name", SparkContext.SPARK_UNKNOWN_USER)
+ val user = System.getProperty("user.name", SparkContext.SPARK_UNKNOWN_USER)
joblogger.getLogDir should be ("/tmp/spark-%s".format(user))
joblogger.getJobIDtoPrintWriter.size should be (1)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala
index 95d3553d91..34d2e4cb8c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala
@@ -169,7 +169,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
System.setProperty("spark.scheduler.allocation.file", xmlPath)
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
- val schedulableBuilder = new FairSchedulableBuilder(rootPool)
+ val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
schedulableBuilder.buildPools()
assert(rootPool.getSchedulableByName("default") != null)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
index bb28a31a99..2bb827c022 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
@@ -81,8 +81,8 @@ class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /*
class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
import TaskLocality.{ANY, PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL}
-
- val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong
+ private val conf = new SparkConf
+ val LOCALITY_WAIT = conf.getOrElse("spark.locality.wait", "3000").toLong
test("TaskSet with no preferences") {
sc = new SparkContext("local", "test")
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
index 27c2d53361..618fae7c16 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
-import org.apache.spark.{LocalSparkContext, SparkContext, SparkEnv}
+import org.apache.spark.{SparkConf, LocalSparkContext, SparkContext, SparkEnv}
import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, TaskResult}
import org.apache.spark.storage.TaskResultBlockId
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 5b4d63b954..4ef5538951 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -31,8 +31,10 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark.util.{SizeEstimator, Utils, AkkaUtils, ByteBufferInputStream}
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
+import org.apache.spark.{SparkConf, SparkContext}
class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester {
+ private val conf = new SparkConf
var store: BlockManager = null
var store2: BlockManager = null
var actorSystem: ActorSystem = null
@@ -42,7 +44,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
var oldHeartBeat: String = null
// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
- System.setProperty("spark.kryoserializer.buffer.mb", "1")
+ conf.set("spark.kryoserializer.buffer.mb", "1")
val serializer = new KryoSerializer
// Implicitly convert strings to BlockIds for test clarity.
@@ -50,22 +52,23 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
def rdd(rddId: Int, splitId: Int) = RDDBlockId(rddId, splitId)
before {
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0)
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0, conf = conf)
this.actorSystem = actorSystem
- System.setProperty("spark.driver.port", boundPort.toString)
- System.setProperty("spark.hostPort", "localhost:" + boundPort)
+ conf.set("spark.driver.port", boundPort.toString)
+ conf.set("spark.hostPort", "localhost:" + boundPort)
master = new BlockManagerMaster(
- Left(actorSystem.actorOf(Props(new BlockManagerMasterActor(true)))))
+ Left(actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf)))), conf)
// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
- oldArch = System.setProperty("os.arch", "amd64")
- oldOops = System.setProperty("spark.test.useCompressedOops", "true")
- oldHeartBeat = System.setProperty("spark.storage.disableBlockManagerHeartBeat", "true")
+ System.setProperty("os.arch", "amd64")
+ conf.set("os.arch", "amd64")
+ conf.set("spark.test.useCompressedOops", "true")
+ conf.set("spark.storage.disableBlockManagerHeartBeat", "true")
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
// Set some value ...
- System.setProperty("spark.hostPort", Utils.localHostName() + ":" + 1111)
+ conf.set("spark.hostPort", Utils.localHostName() + ":" + 1111)
}
after {
@@ -86,13 +89,13 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
master = null
if (oldArch != null) {
- System.setProperty("os.arch", oldArch)
+ conf.set("os.arch", oldArch)
} else {
System.clearProperty("os.arch")
}
if (oldOops != null) {
- System.setProperty("spark.test.useCompressedOops", oldOops)
+ conf.set("spark.test.useCompressedOops", oldOops)
} else {
System.clearProperty("spark.test.useCompressedOops")
}
@@ -133,7 +136,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("master + 1 manager interaction") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -163,8 +166,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("master + 2 managers interaction") {
- store = new BlockManager("exec1", actorSystem, master, serializer, 2000)
- store2 = new BlockManager("exec2", actorSystem, master, new KryoSerializer, 2000)
+ store = new BlockManager("exec1", actorSystem, master, serializer, 2000, conf)
+ store2 = new BlockManager("exec2", actorSystem, master, new KryoSerializer, 2000, conf)
val peers = master.getPeers(store.blockManagerId, 1)
assert(peers.size === 1, "master did not return the other manager as a peer")
@@ -179,7 +182,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("removing block") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -227,7 +230,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("removing rdd") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -261,7 +264,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
test("reregistration on heart beat") {
val heartBeat = PrivateMethod[Unit]('heartBeat)
- store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf)
val a1 = new Array[Byte](400)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
@@ -277,7 +280,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("reregistration on block update") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
@@ -296,7 +299,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
test("reregistration doesn't dead lock") {
val heartBeat = PrivateMethod[Unit]('heartBeat)
- store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf)
val a1 = new Array[Byte](400)
val a2 = List(new Array[Byte](400))
@@ -333,7 +336,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU storage") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -352,7 +355,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU storage with serialization") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -371,7 +374,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU for partitions of same RDD") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -390,7 +393,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU for partitions of multiple RDDs") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
store.putSingle(rdd(0, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
store.putSingle(rdd(0, 2), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
store.putSingle(rdd(1, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
@@ -413,7 +416,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("on-disk storage") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -426,7 +429,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -441,7 +444,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with getLocalBytes") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -456,7 +459,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with serialization") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -471,7 +474,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with serialization and getLocalBytes") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -486,7 +489,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("LRU with mixed storage levels") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -511,7 +514,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU with streams") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
@@ -535,7 +538,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("LRU with mixed storage levels and streams") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
@@ -581,7 +584,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("overly large block") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 500)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 500, conf)
store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
assert(store.getSingle("a1") === None, "a1 was in store")
store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK)
@@ -591,53 +594,53 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
test("block compression") {
try {
- System.setProperty("spark.shuffle.compress", "true")
- store = new BlockManager("exec1", actorSystem, master, serializer, 2000)
+ conf.set("spark.shuffle.compress", "true")
+ store = new BlockManager("exec1", actorSystem, master, serializer, 2000, conf)
store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) <= 100,
"shuffle_0_0_0 was not compressed")
store.stop()
store = null
- System.setProperty("spark.shuffle.compress", "false")
- store = new BlockManager("exec2", actorSystem, master, serializer, 2000)
+ conf.set("spark.shuffle.compress", "false")
+ store = new BlockManager("exec2", actorSystem, master, serializer, 2000, conf)
store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 1000,
"shuffle_0_0_0 was compressed")
store.stop()
store = null
- System.setProperty("spark.broadcast.compress", "true")
- store = new BlockManager("exec3", actorSystem, master, serializer, 2000)
+ conf.set("spark.broadcast.compress", "true")
+ store = new BlockManager("exec3", actorSystem, master, serializer, 2000, conf)
store.putSingle(BroadcastBlockId(0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 100,
"broadcast_0 was not compressed")
store.stop()
store = null
- System.setProperty("spark.broadcast.compress", "false")
- store = new BlockManager("exec4", actorSystem, master, serializer, 2000)
+ conf.set("spark.broadcast.compress", "false")
+ store = new BlockManager("exec4", actorSystem, master, serializer, 2000, conf)
store.putSingle(BroadcastBlockId(0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 1000, "broadcast_0 was compressed")
store.stop()
store = null
- System.setProperty("spark.rdd.compress", "true")
- store = new BlockManager("exec5", actorSystem, master, serializer, 2000)
+ conf.set("spark.rdd.compress", "true")
+ store = new BlockManager("exec5", actorSystem, master, serializer, 2000, conf)
store.putSingle(rdd(0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(rdd(0, 0)) <= 100, "rdd_0_0 was not compressed")
store.stop()
store = null
- System.setProperty("spark.rdd.compress", "false")
- store = new BlockManager("exec6", actorSystem, master, serializer, 2000)
+ conf.set("spark.rdd.compress", "false")
+ store = new BlockManager("exec6", actorSystem, master, serializer, 2000, conf)
store.putSingle(rdd(0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(rdd(0, 0)) >= 1000, "rdd_0_0 was compressed")
store.stop()
store = null
// Check that any other block types are also kept uncompressed
- store = new BlockManager("exec7", actorSystem, master, serializer, 2000)
+ store = new BlockManager("exec7", actorSystem, master, serializer, 2000, conf)
store.putSingle("other_block", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
assert(store.memoryStore.getSize("other_block") >= 1000, "other_block was compressed")
store.stop()
@@ -651,7 +654,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
test("block store put failure") {
// Use Java serializer so we can create an unserializable error.
- store = new BlockManager("<driver>", actorSystem, master, new JavaSerializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, new JavaSerializer, 1200, conf)
// The put should fail since a1 is not serializable.
class UnserializableClass
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index 070982e798..f940448abd 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -17,15 +17,18 @@
package org.apache.spark.storage
-import java.io.{FileWriter, File}
+import java.io.{File, FileWriter}
import scala.collection.mutable
import com.google.common.io.Files
+import org.apache.spark.SparkConf
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
+import scala.util.Try
+import akka.actor.{Props, ActorSelection, ActorSystem}
class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll {
-
+ private val testConf = new SparkConf
val rootDir0 = Files.createTempDir()
rootDir0.deleteOnExit()
val rootDir1 = Files.createTempDir()
@@ -36,10 +39,11 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before
// This suite focuses primarily on consolidation features,
// so we coerce consolidation if not already enabled.
val consolidateProp = "spark.shuffle.consolidateFiles"
- val oldConsolidate = Option(System.getProperty(consolidateProp))
- System.setProperty(consolidateProp, "true")
+ val oldConsolidate = Try(testConf.get(consolidateProp)).toOption
+ testConf.set(consolidateProp, "true")
val shuffleBlockManager = new ShuffleBlockManager(null) {
+ override def conf = testConf.clone
var idToSegmentMap = mutable.Map[ShuffleBlockId, FileSegment]()
override def getBlockLocation(id: ShuffleBlockId) = idToSegmentMap(id)
}
diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
index 5aff26f9fc..a5facd5bbd 100644
--- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.util
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfterAll
import org.scalatest.PrivateMethodTester
+import org.apache.spark.SparkContext
class DummyClass1 {}
@@ -139,7 +140,8 @@ class SizeEstimatorSuite
test("64-bit arch with no compressed oops") {
val arch = System.setProperty("os.arch", "amd64")
val oops = System.setProperty("spark.test.useCompressedOops", "false")
-
+ SparkContext.globalConf.set("os.arch", "amd64")
+ SparkContext.globalConf.set("spark.test.useCompressedOops", "false")
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala
index 72b5c7b88e..12c430be27 100644
--- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala
@@ -36,16 +36,18 @@ object WikipediaPageRank {
System.err.println("Usage: WikipediaPageRank <inputFile> <threshold> <numPartitions> <host> <usePartitioner>")
System.exit(-1)
}
-
- System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- System.setProperty("spark.kryo.registrator", classOf[PRKryoRegistrator].getName)
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ sparkConf.set("spark.kryo.registrator", classOf[PRKryoRegistrator].getName)
val inputFile = args(0)
val threshold = args(1).toDouble
val numPartitions = args(2).toInt
val host = args(3)
val usePartitioner = args(4).toBoolean
- val sc = new SparkContext(host, "WikipediaPageRank")
+
+ sparkConf.setMasterUrl(host).setAppName("WikipediaPageRank")
+ val sc = new SparkContext(sparkConf)
// Parse the Wikipedia page data into a graph
val input = sc.textFile(inputFile)
diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
index ddf6855325..5bf0b7a24a 100644
--- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
@@ -34,15 +34,19 @@ object WikipediaPageRankStandalone {
System.err.println("Usage: WikipediaPageRankStandalone <inputFile> <threshold> <numIterations> <host> <usePartitioner>")
System.exit(-1)
}
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.serializer", "spark.bagel.examples.WPRSerializer")
- System.setProperty("spark.serializer", "spark.bagel.examples.WPRSerializer")
val inputFile = args(0)
val threshold = args(1).toDouble
val numIterations = args(2).toInt
val host = args(3)
val usePartitioner = args(4).toBoolean
- val sc = new SparkContext(host, "WikipediaPageRankStandalone")
+
+ sparkConf.setMasterUrl(host).setAppName("WikipediaPageRankStandalone")
+
+ val sc = new SparkContext(sparkConf)
val input = sc.textFile(inputFile)
val partitioner = new HashPartitioner(sc.defaultParallelism)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
index 50e3f9639c..2402409e6e 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
@@ -26,6 +26,7 @@ import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.actorRef2Scala
+import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
@@ -116,7 +117,7 @@ object FeederActor {
val Seq(host, port) = args.toSeq
- val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt)._1
+ val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt, conf = new SparkConf)._1
val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor")
println("Feeder started as:" + feeder)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
index 36853acab5..2f2d106f86 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
@@ -578,14 +578,13 @@ object ALS {
val implicitPrefs = if (args.length >= 7) args(6).toBoolean else false
val alpha = if (args.length >= 8) args(7).toDouble else 1
val blocks = if (args.length == 9) args(8).toInt else -1
-
- System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- System.setProperty("spark.kryo.registrator", classOf[ALSRegistrator].getName)
- System.setProperty("spark.kryo.referenceTracking", "false")
- System.setProperty("spark.kryoserializer.buffer.mb", "8")
- System.setProperty("spark.locality.wait", "10000")
-
val sc = new SparkContext(master, "ALS")
+ sc.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ sc.conf.set("spark.kryo.registrator", classOf[ALSRegistrator].getName)
+ sc.conf.set("spark.kryo.referenceTracking", "false")
+ sc.conf.set("spark.kryoserializer.buffer.mb", "8")
+ sc.conf.set("spark.locality.wait", "10000")
+
val ratings = sc.textFile(ratingsFile).map { line =>
val fields = line.split(',')
Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index eeeca3ea8a..433268a1dd 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -61,13 +61,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
private var amClient: AMRMClient[ContainerRequest] = _
// Default to numWorkers * 2, with minimum of 3
- private val maxNumWorkerFailures = System.getProperty("spark.yarn.max.worker.failures",
+ private val maxNumWorkerFailures = conf.getOrElse("spark.yarn.max.worker.failures",
math.max(args.numWorkers * 2, 3).toString()).toInt
def run() {
// Setup the directories so things go to YARN approved directories rather
// than user specified and /tmp.
- System.setProperty("spark.local.dir", getLocalDirs())
+ conf.set("spark.local.dir", getLocalDirs())
// Use priority 30 as it's higher then HDFS. It's same priority as MapReduce is using.
ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
@@ -138,10 +138,10 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
logInfo("Waiting for Spark driver to be reachable.")
var driverUp = false
var tries = 0
- val numTries = System.getProperty("spark.yarn.applicationMaster.waitTries", "10").toInt
+ val numTries = conf.getOrElse("spark.yarn.applicationMaster.waitTries", "10").toInt
while (!driverUp && tries < numTries) {
- val driverHost = System.getProperty("spark.driver.host")
- val driverPort = System.getProperty("spark.driver.port")
+ val driverHost = conf.get("spark.driver.host")
+ val driverPort = conf.get("spark.driver.port")
try {
val socket = new Socket(driverHost, driverPort.toInt)
socket.close()
@@ -199,7 +199,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
ApplicationMaster.sparkContextRef.synchronized {
var numTries = 0
val waitTime = 10000L
- val maxNumTries = System.getProperty("spark.yarn.ApplicationMaster.waitTries", "10").toInt
+ val maxNumTries = conf.getOrElse("spark.yarn.ApplicationMaster.waitTries", "10").toInt
while (ApplicationMaster.sparkContextRef.get() == null && numTries < maxNumTries) {
logInfo("Waiting for Spark context initialization ... " + numTries)
numTries = numTries + 1
@@ -265,7 +265,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// we want to be reasonably responsive without causing too many requests to RM.
val schedulerInterval =
- System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
+ conf.getOrElse("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
// must be <= timeoutInterval / 2.
val interval = math.min(timeoutInterval / 2, schedulerInterval)
@@ -343,7 +343,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
private def cleanupStagingDir() {
var stagingDirPath: Path = null
try {
- val preserveFiles = System.getProperty("spark.yarn.preserve.staging.files", "false").toBoolean
+ val preserveFiles = conf.getOrElse("spark.yarn.preserve.staging.files", "false").toBoolean
if (!preserveFiles) {
stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
if (stagingDirPath == null) {
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 94678815e8..a322f60864 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -244,7 +244,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
}
val dst = new Path(fs.getHomeDirectory(), appStagingDir)
- val replication = System.getProperty("spark.yarn.submit.file.replication", "3").toShort
+ val replication = conf.getOrElse("spark.yarn.submit.file.replication", "3").toShort
if (UserGroupInformation.isSecurityEnabled()) {
val dstFs = dst.getFileSystem(conf)
@@ -499,7 +499,7 @@ object Client {
Path.SEPARATOR + LOG4J_PROP)
}
// Normally the users app.jar is last in case conflicts with spark jars
- val userClasspathFirst = System.getProperty("spark.yarn.user.classpath.first", "false")
+ val userClasspathFirst = conf.getOrElse("spark.yarn.user.classpath.first", "false")
.toBoolean
if (userClasspathFirst) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 70be15d0a3..41ac292249 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -35,7 +35,7 @@ class ClientArguments(val args: Array[String]) {
var workerMemory = 1024 // MB
var workerCores = 1
var numWorkers = 2
- var amQueue = System.getProperty("QUEUE", "default")
+ var amQueue = conf.getOrElse("QUEUE", "default")
var amMemory: Int = 512 // MB
var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
var appName: String = "Spark"
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index bc31bb2eb0..f7d73f0d83 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -136,8 +136,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
Thread.sleep(100)
}
}
- System.setProperty("spark.driver.host", driverHost)
- System.setProperty("spark.driver.port", driverPort.toString)
+ conf.set("spark.driver.host", driverHost)
+ conf.set("spark.driver.port", driverPort.toString)
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index c27257cda4..71d1cbd416 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -254,8 +254,8 @@ private[yarn] class YarnAllocationHandler(
} else {
val workerId = workerIdCounter.incrementAndGet().toString
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
- System.getProperty("spark.driver.host"),
- System.getProperty("spark.driver.port"),
+ conf.get("spark.driver.host"),
+ conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
logInfo("Launching container %s for on host %s".format(containerId, workerHostname))
diff --git a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index b206780c78..6feaaff014 100644
--- a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -52,8 +52,8 @@ private[spark] class YarnClientSchedulerBackend(
if (workerNumber == null)
workerNumber = defaultWorkerNumber
- val driverHost = System.getProperty("spark.driver.host")
- val driverPort = System.getProperty("spark.driver.port")
+ val driverHost = conf.get("spark.driver.host")
+ val driverPort = conf.get("spark.driver.port")
val hostport = driverHost + ":" + driverPort
val argsArray = Array[String](
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index ab96cfa18b..ffb54a24ac 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -244,7 +244,8 @@ object SparkBuild extends Build {
"com.codahale.metrics" % "metrics-ganglia" % "3.0.0",
"com.codahale.metrics" % "metrics-graphite" % "3.0.0",
"com.twitter" %% "chill" % "0.3.1",
- "com.twitter" % "chill-java" % "0.3.1"
+ "com.twitter" % "chill-java" % "0.3.1",
+ "com.typesafe" % "config" % "1.0.2"
)
)
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index 523fd1222d..b2f499e637 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -930,9 +930,6 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter,
def createSparkContext(): SparkContext = {
val uri = System.getenv("SPARK_EXECUTOR_URI")
- if (uri != null) {
- System.setProperty("spark.executor.uri", uri)
- }
val master = this.master match {
case Some(m) => m
case None => {
@@ -942,6 +939,10 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter,
}
val jars = SparkILoop.getAddedJars.map(new java.io.File(_).getAbsolutePath)
sparkContext = new SparkContext(master, "Spark shell", System.getenv("SPARK_HOME"), jars)
+ if (uri != null) {
+ sparkContext.conf.set("spark.executor.uri", uri)
+ }
+ sparkContext.conf.set("spark.repl.class.uri", intp.classServer.uri)
echo("Created spark context..")
sparkContext
}
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
index e1455ef8a1..0d412e4478 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
@@ -34,10 +34,8 @@ import scala.tools.reflect.StdRuntimeTags._
import scala.util.control.ControlThrowable
import util.stackTraceString
-import org.apache.spark.HttpServer
+import org.apache.spark.{SparkContext, HttpServer, SparkEnv, Logging}
import org.apache.spark.util.Utils
-import org.apache.spark.SparkEnv
-import org.apache.spark.Logging
// /** directory to save .class files to */
// private class ReplVirtualDirectory(out: JPrintWriter) extends VirtualDirectory("((memory))", None) {
@@ -91,7 +89,7 @@ import org.apache.spark.Logging
/** Local directory to save .class files too */
val outputDir = {
val tmp = System.getProperty("java.io.tmpdir")
- val rootDir = System.getProperty("spark.repl.classdir", tmp)
+ val rootDir = SparkContext.globalConf.getOrElse("spark.repl.classdir", tmp)
Utils.createTempDir(rootDir)
}
if (SPARK_DEBUG_REPL) {
@@ -112,7 +110,6 @@ import org.apache.spark.Logging
// Start the classServer and store its URI in a spark system property
// (which will be passed to executors so that they can connect to it)
classServer.start()
- System.setProperty("spark.repl.class.uri", classServer.uri)
if (SPARK_DEBUG_REPL) {
echo("Class server started, URI = " + classServer.uri)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 9271914eb5..b8e1427a21 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -34,7 +34,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
extends Logging with Serializable {
val master = ssc.sc.master
val framework = ssc.sc.appName
- val sparkHome = ssc.sc.sparkHome
+ val sparkHome = ssc.sc.getSparkHome.getOrElse(null)
val jars = ssc.sc.jars
val environment = ssc.sc.environment
val graph = ssc.graph
@@ -42,6 +42,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
val checkpointDuration = ssc.checkpointDuration
val pendingTimes = ssc.scheduler.jobManager.getPendingTimes()
val delaySeconds = MetadataCleaner.getDelaySeconds
+ val sparkConf = ssc.sc.conf
def validate() {
assert(master != null, "Checkpoint.master is null")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala
index ed892e33e6..1d23713c80 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala
@@ -26,7 +26,7 @@ class Scheduler(ssc: StreamingContext) extends Logging {
initLogging()
- val concurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt
+ val concurrentJobs = ssc.sc.conf.getOrElse("spark.streaming.concurrentJobs", "1").toInt
val jobManager = new JobManager(ssc, concurrentJobs)
val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
new CheckpointWriter(ssc.checkpointDir)
@@ -34,7 +34,7 @@ class Scheduler(ssc: StreamingContext) extends Logging {
null
}
- val clockClass = System.getProperty(
+ val clockClass = ssc.sc.conf.getOrElse(
"spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
@@ -73,7 +73,7 @@ class Scheduler(ssc: StreamingContext) extends Logging {
// or if the property is defined set it to that time
if (clock.isInstanceOf[ManualClock]) {
val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
- val jumpTime = System.getProperty("spark.streaming.manualClock.jump", "0").toLong
+ val jumpTime = ssc.sc.conf.getOrElse("spark.streaming.manualClock.jump", "0").toLong
clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index d2c4fdee65..76744223e1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -115,7 +115,7 @@ class StreamingContext private (
protected[streaming] val sc: SparkContext = {
if (isCheckpointPresent) {
- new SparkContext(cp_.master, cp_.framework, cp_.sparkHome, cp_.jars, cp_.environment)
+ new SparkContext(cp_.sparkConf, cp_.environment)
} else {
sc_
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index d5ae8aef92..8bf761b8cb 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -175,8 +175,8 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
/** A helper actor that communicates with the NetworkInputTracker */
private class NetworkReceiverActor extends Actor {
logInfo("Attempting to register with tracker")
- val ip = System.getProperty("spark.driver.host", "localhost")
- val port = System.getProperty("spark.driver.port", "7077").toInt
+ val ip = env.conf.getOrElse("spark.driver.host", "localhost")
+ val port = env.conf.getOrElse("spark.driver.port", "7077").toInt
val url = "akka.tcp://spark@%s:%s/user/NetworkInputTracker".format(ip, port)
val tracker = env.actorSystem.actorSelection(url)
val timeout = 5.seconds
@@ -213,7 +213,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
case class Block(id: BlockId, buffer: ArrayBuffer[T], metadata: Any = null)
val clock = new SystemClock()
- val blockInterval = System.getProperty("spark.streaming.blockInterval", "200").toLong
+ val blockInterval = env.conf.getOrElse("spark.streaming.blockInterval", "200").toLong
val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer)
val blockStorageLevel = storageLevel
val blocksForPushing = new ArrayBlockingQueue[Block](1000)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index e81287b44e..315bd5443c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -42,7 +42,7 @@ import org.apache.spark.streaming.util.ManualClock
*/
class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
- System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+ conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
before {
FileUtils.deleteDirectory(new File(checkpointDir))
@@ -69,7 +69,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second")
- System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+ conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
val stateStreamCheckpointInterval = Seconds(1)
@@ -135,13 +135,13 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Restart stream computation from the new checkpoint file to see whether that file has
// correct checkpoint data
+ conf.set("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString)
ssc = new StreamingContext(checkpointDir)
stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head
logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n") + "]")
assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from second failure")
// Adjust manual clock time as if it is being restarted after a delay
- System.setProperty("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString)
ssc.start()
advanceTimeWithRealDelay(ssc, 4)
ssc.stop()
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 7dc82decef..da8f135dd7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -53,7 +53,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
override def checkpointDir = "checkpoint"
before {
- System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+ conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
}
after {
@@ -68,7 +68,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
testServer.start()
// Set up the streaming context and input streams
- val ssc = new StreamingContext(master, framework, batchDuration)
+ val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
val networkStream = ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]]
val outputStream = new TestOutputStream(networkStream, outputBuffer)
@@ -113,7 +113,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
test("flume input stream") {
// Set up the streaming context and input streams
- val ssc = new StreamingContext(master, framework, batchDuration)
+ val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
val flumeStream = ssc.flumeStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
@@ -162,11 +162,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
test("file input stream") {
// Disable manual clock as FileInputDStream does not work with manual clock
- System.clearProperty("spark.streaming.clock")
+ conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
// Set up the streaming context and input streams
val testDir = Files.createTempDir()
- val ssc = new StreamingContext(master, framework, batchDuration)
+ val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
val fileStream = ssc.textFileStream(testDir.toString)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
def output = outputBuffer.flatMap(x => x)
@@ -207,7 +207,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
FileUtils.deleteDirectory(testDir)
// Enable manual clock back again for other tests
- System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+ conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
}
@@ -218,7 +218,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
testServer.start()
// Set up the streaming context and input streams
- val ssc = new StreamingContext(master, framework, batchDuration)
+ val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
val networkStream = ssc.actorStream[String](Props(new TestActor(port)), "TestActor",
StorageLevel.MEMORY_AND_DISK) //Had to pass the local value of port to prevent from closing over entire scope
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
@@ -262,7 +262,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
test("kafka input stream") {
- val ssc = new StreamingContext(master, framework, batchDuration)
+ val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
val topics = Map("my-topic" -> 1)
val test1 = ssc.kafkaStream("localhost:12345", "group", topics)
val test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK)
@@ -285,7 +285,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
MultiThreadTestReceiver.haveAllThreadsFinished = false
// set up the network stream using the test receiver
- val ssc = new StreamingContext(master, framework, batchDuration)
+ val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
val networkStream = ssc.networkStream[Int](testReceiver)
val countStream = networkStream.count
val outputBuffer = new ArrayBuffer[Seq[Long]] with SynchronizedBuffer[Seq[Long]]
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 2f34e812a1..d1cab0c609 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -28,7 +28,7 @@ import java.io.{ObjectInputStream, IOException}
import org.scalatest.{BeforeAndAfter, FunSuite}
-import org.apache.spark.Logging
+import org.apache.spark.{SparkContext, SparkConf, Logging}
import org.apache.spark.rdd.RDD
/**
@@ -130,6 +130,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
// Whether to actually wait in real time before changing manual clock
def actuallyWait = false
+ def conf = new SparkConf().setMasterUrl(master).setAppName(framework).set("spark.cleaner.ttl", "3600")
/**
* Set up required DStreams to test the DStream operation using the two sequences
* of input collections.
@@ -139,9 +140,9 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
operation: DStream[U] => DStream[V],
numPartitions: Int = numInputPartitions
): StreamingContext = {
-
+ val sc = new SparkContext(conf)
// Create StreamingContext
- val ssc = new StreamingContext(master, framework, batchDuration)
+ val ssc = new StreamingContext(sc, batchDuration)
if (checkpointDir != null) {
ssc.checkpoint(checkpointDir)
}
@@ -165,9 +166,9 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
input2: Seq[Seq[V]],
operation: (DStream[U], DStream[V]) => DStream[W]
): StreamingContext = {
-
+ val sc = new SparkContext(conf)
// Create StreamingContext
- val ssc = new StreamingContext(master, framework, batchDuration)
+ val ssc = new StreamingContext(sc, batchDuration)
if (checkpointDir != null) {
ssc.checkpoint(checkpointDir)
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 240ed8b32a..1dd38dd13e 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -58,13 +58,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
private var isLastAMRetry: Boolean = true
// default to numWorkers * 2, with minimum of 3
- private val maxNumWorkerFailures = System.getProperty("spark.yarn.max.worker.failures",
+ private val maxNumWorkerFailures = conf.getOrElse("spark.yarn.max.worker.failures",
math.max(args.numWorkers * 2, 3).toString()).toInt
def run() {
// Setup the directories so things go to yarn approved directories rather
// then user specified and /tmp.
- System.setProperty("spark.local.dir", getLocalDirs())
+ conf.set("spark.local.dir", getLocalDirs())
// Use priority 30 as its higher then HDFS. Its same priority as MapReduce is using.
ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
@@ -165,10 +165,10 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
logInfo("Waiting for spark driver to be reachable.")
var driverUp = false
var tries = 0
- val numTries = System.getProperty("spark.yarn.applicationMaster.waitTries", "10").toInt
+ val numTries = conf.getOrElse("spark.yarn.applicationMaster.waitTries", "10").toInt
while(!driverUp && tries < numTries) {
- val driverHost = System.getProperty("spark.driver.host")
- val driverPort = System.getProperty("spark.driver.port")
+ val driverHost = conf.get("spark.driver.host")
+ val driverPort = conf.get("spark.driver.port")
try {
val socket = new Socket(driverHost, driverPort.toInt)
socket.close()
@@ -226,7 +226,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
ApplicationMaster.sparkContextRef.synchronized {
var count = 0
val waitTime = 10000L
- val numTries = System.getProperty("spark.yarn.ApplicationMaster.waitTries", "10").toInt
+ val numTries = conf.getOrElse("spark.yarn.ApplicationMaster.waitTries", "10").toInt
while (ApplicationMaster.sparkContextRef.get() == null && count < numTries) {
logInfo("Waiting for spark context initialization ... " + count)
count = count + 1
@@ -294,7 +294,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// we want to be reasonably responsive without causing too many requests to RM.
val schedulerInterval =
- System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
+ conf.getOrElse("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
// must be <= timeoutInterval / 2.
val interval = math.min(timeoutInterval / 2, schedulerInterval)
@@ -377,7 +377,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
private def cleanupStagingDir() {
var stagingDirPath: Path = null
try {
- val preserveFiles = System.getProperty("spark.yarn.preserve.staging.files", "false").toBoolean
+ val preserveFiles = conf.getOrElse("spark.yarn.preserve.staging.files", "false").toBoolean
if (!preserveFiles) {
stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
if (stagingDirPath == null) {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 79dd038065..29892e98e3 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -230,7 +230,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
}
val dst = new Path(fs.getHomeDirectory(), appStagingDir)
- val replication = System.getProperty("spark.yarn.submit.file.replication", "3").toShort
+ val replication = conf.getOrElse("spark.yarn.submit.file.replication", "3").toShort
if (UserGroupInformation.isSecurityEnabled()) {
val dstFs = dst.getFileSystem(conf)
@@ -461,7 +461,7 @@ object Client {
def main(argStrings: Array[String]) {
// Set an env variable indicating we are running in YARN mode.
// Note that anything with SPARK prefix gets propagated to all (remote) processes
- System.setProperty("SPARK_YARN_MODE", "true")
+ conf.set("SPARK_YARN_MODE", "true")
val args = new ClientArguments(argStrings)
@@ -483,7 +483,7 @@ object Client {
Path.SEPARATOR + LOG4J_PROP)
}
// Normally the users app.jar is last in case conflicts with spark jars
- val userClasspathFirst = System.getProperty("spark.yarn.user.classpath.first", "false")
+ val userClasspathFirst = conf.getOrElse("spark.yarn.user.classpath.first", "false")
.toBoolean
if (userClasspathFirst) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index b3a7886d93..617289f568 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -33,7 +33,7 @@ class ClientArguments(val args: Array[String]) {
var workerMemory = 1024
var workerCores = 1
var numWorkers = 2
- var amQueue = System.getProperty("QUEUE", "default")
+ var amQueue = conf.getOrElse("QUEUE", "default")
var amMemory: Int = 512
var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
var appName: String = "Spark"
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index 69038844bb..c1e79cbe66 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -162,8 +162,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
Thread.sleep(100)
}
}
- System.setProperty("spark.driver.host", driverHost)
- System.setProperty("spark.driver.port", driverPort.toString)
+ conf.set("spark.driver.host", driverHost)
+ conf.set("spark.driver.port", driverPort.toString)
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 9ab2073529..4c9fee5695 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -239,7 +239,7 @@ private[yarn] class YarnAllocationHandler(
// (workerIdCounter)
val workerId = workerIdCounter.incrementAndGet().toString
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
- System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
+ conf.get("spark.driver.host"), conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
logInfo("launching container on " + containerId + " host " + workerHostname)
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index b206780c78..6feaaff014 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -52,8 +52,8 @@ private[spark] class YarnClientSchedulerBackend(
if (workerNumber == null)
workerNumber = defaultWorkerNumber
- val driverHost = System.getProperty("spark.driver.host")
- val driverPort = System.getProperty("spark.driver.port")
+ val driverHost = conf.get("spark.driver.host")
+ val driverPort = conf.get("spark.driver.port")
val hostport = driverHost + ":" + driverPort
val argsArray = Array[String](