aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2014-01-01 22:03:39 -0500
committerMatei Zaharia <matei@databricks.com>2014-01-01 22:03:39 -0500
commite2c68642c64345434e2034082cf9b299491e9e9f (patch)
tree8367e90bf00d2a40257a6465ad4ee4b5403f0411
parent45ff8f413d9959b7f464176cd20dc56db3f711af (diff)
downloadspark-e2c68642c64345434e2034082cf9b299491e9e9f.tar.gz
spark-e2c68642c64345434e2034082cf9b299491e9e9f.tar.bz2
spark-e2c68642c64345434e2034082cf9b299491e9e9f.zip
Miscellaneous fixes from code review.
Also replaced SparkConf.getOrElse with just a "get" that takes a default value, and added getInt, getLong, etc to make code that uses this simpler later on.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala60
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala73
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/io/CompressionCodec.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/network/ConnectionManager.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/ui/SparkUI.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/SparkConfSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala12
-rw-r--r--python/pyspark/conf.py12
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala2
49 files changed, 206 insertions, 189 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 96239cf4be..98343e9532 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -42,6 +42,12 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
/** Set a configuration variable. */
def set(key: String, value: String): SparkConf = {
+ if (key == null) {
+ throw new NullPointerException("null key")
+ }
+ if (value == null) {
+ throw new NullPointerException("null value")
+ }
settings(key) = value
this
}
@@ -51,26 +57,17 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
* run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster.
*/
def setMaster(master: String): SparkConf = {
- if (master != null) {
- settings("spark.master") = master
- }
- this
+ set("spark.master", master)
}
/** Set a name for your application. Shown in the Spark web UI. */
def setAppName(name: String): SparkConf = {
- if (name != null) {
- settings("spark.app.name") = name
- }
- this
+ set("spark.app.name", name)
}
/** Set JAR files to distribute to the cluster. */
def setJars(jars: Seq[String]): SparkConf = {
- if (!jars.isEmpty) {
- settings("spark.jars") = jars.mkString(",")
- }
- this
+ set("spark.jars", jars.mkString(","))
}
/** Set JAR files to distribute to the cluster. (Java-friendly version.) */
@@ -84,8 +81,7 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
* (for example spark.executorEnv.PATH) but this method makes them easier to set.
*/
def setExecutorEnv(variable: String, value: String): SparkConf = {
- settings("spark.executorEnv." + variable) = value
- this
+ set("spark.executorEnv." + variable, value)
}
/**
@@ -112,10 +108,7 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
* Set the location where Spark is installed on worker nodes.
*/
def setSparkHome(home: String): SparkConf = {
- if (home != null) {
- settings("spark.home") = home
- }
- this
+ set("spark.home", home)
}
/** Set multiple parameters together */
@@ -132,9 +125,20 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
this
}
- /** Get a parameter; throws an exception if it's not set */
+ /** Remove a parameter from the configuration */
+ def remove(key: String): SparkConf = {
+ settings.remove(key)
+ this
+ }
+
+ /** Get a parameter; throws a NoSuchElementException if it's not set */
def get(key: String): String = {
- settings(key)
+ settings.getOrElse(key, throw new NoSuchElementException(key))
+ }
+
+ /** Get a parameter, falling back to a default if not set */
+ def get(key: String, defaultValue: String): String = {
+ settings.getOrElse(key, defaultValue)
}
/** Get a parameter as an Option */
@@ -145,9 +149,19 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
/** Get all parameters as a list of pairs */
def getAll: Array[(String, String)] = settings.clone().toArray
- /** Get a parameter, falling back to a default if not set */
- def getOrElse(k: String, defaultValue: String): String = {
- settings.getOrElse(k, defaultValue)
+ /** Get a parameter as an integer, falling back to a default if not set */
+ def getInt(key: String, defaultValue: Int): Int = {
+ getOption(key).map(_.toInt).getOrElse(defaultValue)
+ }
+
+ /** Get a parameter as a long, falling back to a default if not set */
+ def getLong(key: String, defaultValue: Long): Long = {
+ getOption(key).map(_.toLong).getOrElse(defaultValue)
+ }
+
+ /** Get a parameter as a double, falling back to a default if not set */
+ def getDouble(key: String, defaultValue: Double): Double = {
+ getOption(key).map(_.toDouble).getOrElse(defaultValue)
}
/** Get all executor environment variables set on this SparkConf */
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 46874c41a2..84bd0f7ffd 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -22,12 +22,11 @@ import java.net.URI
import java.util.{UUID, Properties}
import java.util.concurrent.atomic.AtomicInteger
-import scala.collection.{Map, Set, immutable}
+import scala.collection.{Map, Set}
import scala.collection.generic.Growable
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.reflect.{ClassTag, classTag}
-import scala.util.Try
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@@ -49,7 +48,8 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me
import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
import org.apache.spark.ui.SparkUI
-import org.apache.spark.util._
+import org.apache.spark.util.{Utils, TimeStampedHashMap, MetadataCleaner, MetadataCleanerType,
+ClosureCleaner}
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -77,7 +77,7 @@ class SparkContext(
* @param conf a [[org.apache.spark.SparkConf]] object specifying other Spark parameters
*/
def this(master: String, appName: String, conf: SparkConf) =
- this(conf.clone().setMaster(master).setAppName(appName))
+ this(SparkContext.updatedConf(conf, master, appName))
/**
* Alternative constructor that allows setting common Spark properties directly
@@ -97,13 +97,7 @@ class SparkContext(
environment: Map[String, String] = Map(),
preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) =
{
- this(
- new SparkConf()
- .setMaster(master)
- .setAppName(appName)
- .setJars(jars)
- .setExecutorEnv(environment.toSeq)
- .setSparkHome(sparkHome),
+ this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment),
preferredNodeLocationData)
}
@@ -175,11 +169,9 @@ class SparkContext(
// Environment variables to pass to our executors
private[spark] val executorEnvs = HashMap[String, String]()
// Note: SPARK_MEM is included for Mesos, but overwritten for standalone mode in ExecutorRunner
- for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS", "SPARK_TESTING")) {
- val value = System.getenv(key)
- if (value != null) {
- executorEnvs(key) = value
- }
+ for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS", "SPARK_TESTING");
+ value <- Option(System.getenv(key))) {
+ executorEnvs(key) = value
}
// Since memory can be set with a system property too, use that
executorEnvs("SPARK_MEM") = executorMemory + "m"
@@ -220,7 +212,7 @@ class SparkContext(
hadoopConf.set(key.substring("spark.hadoop.".length), value)
}
}
- val bufferSize = conf.getOrElse("spark.buffer.size", "65536")
+ val bufferSize = conf.get("spark.buffer.size", "65536")
hadoopConf.set("io.file.buffer.size", bufferSize)
hadoopConf
}
@@ -733,13 +725,7 @@ class SparkContext(
* (in that order of preference). If neither of these is set, return None.
*/
private[spark] def getSparkHome(): Option[String] = {
- if (conf.contains("spark.home")) {
- Some(conf.get("spark.home"))
- } else if (System.getenv("SPARK_HOME") != null) {
- Some(System.getenv("SPARK_HOME"))
- } else {
- None
- }
+ conf.getOption("spark.home").orElse(Option(System.getenv("SPARK_HOME")))
}
/**
@@ -1026,7 +1012,7 @@ object SparkContext {
/**
* Find the JAR from which a given class was loaded, to make it easy for users to pass
- * their JARs to SparkContext
+ * their JARs to SparkContext.
*/
def jarOfClass(cls: Class[_]): Seq[String] = {
val uri = cls.getResource("/" + cls.getName.replace('.', '/') + ".class")
@@ -1043,10 +1029,41 @@ object SparkContext {
}
}
- /** Find the JAR that contains the class of a particular object */
+ /**
+ * Find the JAR that contains the class of a particular object, to make it easy for users
+ * to pass their JARs to SparkContext. In most cases you can call jarOfObject(this) in
+ * your driver program.
+ */
def jarOfObject(obj: AnyRef): Seq[String] = jarOfClass(obj.getClass)
- // Creates a task scheduler based on a given master URL. Extracted for testing.
+ /**
+ * Creates a modified version of a SparkConf with the parameters that can be passed separately
+ * to SparkContext, to make it easier to write SparkContext's constructors. This ignores
+ * parameters that are passed as the default value of null, instead of throwing an exception
+ * like SparkConf would.
+ */
+ private def updatedConf(
+ conf: SparkConf,
+ master: String,
+ appName: String,
+ sparkHome: String = null,
+ jars: Seq[String] = Nil,
+ environment: Map[String, String] = Map()): SparkConf =
+ {
+ val res = conf.clone()
+ res.setMaster(master)
+ res.setAppName(appName)
+ if (sparkHome != null) {
+ res.setSparkHome(sparkHome)
+ }
+ if (!jars.isEmpty) {
+ res.setJars(jars)
+ }
+ res.setExecutorEnv(environment.toSeq)
+ res
+ }
+
+ /** Creates a task scheduler based on a given master URL. Extracted for testing. */
private def createTaskScheduler(sc: SparkContext, master: String, appName: String)
: TaskScheduler =
{
@@ -1156,7 +1173,7 @@ object SparkContext {
case mesosUrl @ MESOS_REGEX(_) =>
MesosNativeLibrary.load()
val scheduler = new TaskSchedulerImpl(sc)
- val coarseGrained = sc.conf.getOrElse("spark.mesos.coarse", "false").toBoolean
+ val coarseGrained = sc.conf.get("spark.mesos.coarse", "false").toBoolean
val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
val backend = if (coarseGrained) {
new CoarseMesosSchedulerBackend(scheduler, sc, url, appName)
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index d06af8e667..634a94f0a7 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -144,17 +144,17 @@ object SparkEnv extends Logging {
// Create an instance of the class named by the given Java system property, or by
// defaultClassName if the property is not set, and return it as a T
def instantiateClass[T](propertyName: String, defaultClassName: String): T = {
- val name = conf.getOrElse(propertyName, defaultClassName)
+ val name = conf.get(propertyName, defaultClassName)
Class.forName(name, true, classLoader).newInstance().asInstanceOf[T]
}
val serializerManager = new SerializerManager
val serializer = serializerManager.setDefault(
- conf.getOrElse("spark.serializer", "org.apache.spark.serializer.JavaSerializer"), conf)
+ conf.get("spark.serializer", "org.apache.spark.serializer.JavaSerializer"), conf)
val closureSerializer = serializerManager.get(
- conf.getOrElse("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"),
+ conf.get("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"),
conf)
def registerOrLookup(name: String, newActor: => Actor): Either[ActorRef, ActorSelection] = {
@@ -162,8 +162,8 @@ object SparkEnv extends Logging {
logInfo("Registering " + name)
Left(actorSystem.actorOf(Props(newActor), name = name))
} else {
- val driverHost: String = conf.getOrElse("spark.driver.host", "localhost")
- val driverPort: Int = conf.getOrElse("spark.driver.port", "7077").toInt
+ val driverHost: String = conf.get("spark.driver.host", "localhost")
+ val driverPort: Int = conf.get("spark.driver.port", "7077").toInt
Utils.checkHost(driverHost, "Expected hostname")
val url = "akka.tcp://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
logInfo("Connecting to " + name + ": " + url)
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 05fd824254..32cc70e8c9 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -41,7 +41,7 @@ private[spark] class PythonRDD[T: ClassTag](
accumulator: Accumulator[JList[Array[Byte]]])
extends RDD[Array[Byte]](parent) {
- val bufferSize = conf.getOrElse("spark.buffer.size", "65536").toInt
+ val bufferSize = conf.get("spark.buffer.size", "65536").toInt
override def getPartitions = parent.partitions
@@ -250,7 +250,7 @@ private class PythonAccumulatorParam(@transient serverHost: String, serverPort:
Utils.checkHost(serverHost, "Expected hostname")
- val bufferSize = SparkEnv.get.conf.getOrElse("spark.buffer.size", "65536").toInt
+ val bufferSize = SparkEnv.get.conf.get("spark.buffer.size", "65536").toInt
override def zero(value: JList[Array[Byte]]): JList[Array[Byte]] = new JArrayList
diff --git a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
index be99d229ef..0fc478a419 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
@@ -31,7 +31,7 @@ abstract class Broadcast[T](private[spark] val id: Long) extends Serializable {
override def toString = "Broadcast(" + id + ")"
}
-private[spark]
+private[spark]
class BroadcastManager(val _isDriver: Boolean, conf: SparkConf) extends Logging with Serializable {
private var initialized = false
@@ -43,7 +43,7 @@ class BroadcastManager(val _isDriver: Boolean, conf: SparkConf) extends Logging
private def initialize() {
synchronized {
if (!initialized) {
- val broadcastFactoryClass = conf.getOrElse(
+ val broadcastFactoryClass = conf.get(
"spark.broadcast.factory", "org.apache.spark.broadcast.HttpBroadcastFactory")
broadcastFactory =
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 47528bcee8..db596d5fcc 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -92,8 +92,8 @@ private object HttpBroadcast extends Logging {
def initialize(isDriver: Boolean, conf: SparkConf) {
synchronized {
if (!initialized) {
- bufferSize = conf.getOrElse("spark.buffer.size", "65536").toInt
- compress = conf.getOrElse("spark.broadcast.compress", "true").toBoolean
+ bufferSize = conf.get("spark.buffer.size", "65536").toInt
+ compress = conf.get("spark.broadcast.compress", "true").toBoolean
if (isDriver) {
createServer(conf)
conf.set("spark.httpBroadcast.uri", serverUri)
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 00ec3b971b..9530938278 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -180,7 +180,7 @@ extends Logging {
initialized = false
}
- lazy val BLOCK_SIZE = conf.getOrElse("spark.broadcast.blockSize", "4096").toInt * 1024
+ lazy val BLOCK_SIZE = conf.get("spark.broadcast.blockSize", "4096").toInt * 1024
def blockifyObject[T](obj: T): TorrentInfo = {
val byteArray = Utils.serialize[T](obj)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 9c89e36b14..7b696cfcca 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -43,11 +43,11 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
val conf = new SparkConf
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
- val WORKER_TIMEOUT = conf.getOrElse("spark.worker.timeout", "60").toLong * 1000
- val RETAINED_APPLICATIONS = conf.getOrElse("spark.deploy.retainedApplications", "200").toInt
- val REAPER_ITERATIONS = conf.getOrElse("spark.dead.worker.persistence", "15").toInt
- val RECOVERY_DIR = conf.getOrElse("spark.deploy.recoveryDirectory", "")
- val RECOVERY_MODE = conf.getOrElse("spark.deploy.recoveryMode", "NONE")
+ val WORKER_TIMEOUT = conf.get("spark.worker.timeout", "60").toLong * 1000
+ val RETAINED_APPLICATIONS = conf.get("spark.deploy.retainedApplications", "200").toInt
+ val REAPER_ITERATIONS = conf.get("spark.dead.worker.persistence", "15").toInt
+ val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "")
+ val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")
var nextAppNumber = 0
val workers = new HashSet[WorkerInfo]
@@ -88,7 +88,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
// As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each app
// among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
- val spreadOutApps = conf.getOrElse("spark.deploy.spreadOut", "true").toBoolean
+ val spreadOutApps = conf.get("spark.deploy.spreadOut", "true").toBoolean
override def preStart() {
logInfo("Starting Spark master at " + masterUrl)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
index 7ce83f9c36..e7f3224091 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
@@ -27,8 +27,8 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
var host = Utils.localHostName()
var port = 7077
var webUiPort = 8080
-
- // Check for settings in environment variables
+
+ // Check for settings in environment variables
if (System.getenv("SPARK_MASTER_HOST") != null) {
host = System.getenv("SPARK_MASTER_HOST")
}
@@ -38,7 +38,7 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {
webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt
}
- if (conf.get("master.ui.port") != null) {
+ if (conf.contains("master.ui.port")) {
webUiPort = conf.get("master.ui.port").toInt
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
index 60c7a7c2d6..999090ad74 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
@@ -37,7 +37,7 @@ import org.apache.spark.{SparkConf, Logging}
*/
private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher,
conf: SparkConf) extends Logging {
- val ZK_URL = conf.getOrElse("spark.deploy.zookeeper.url", "")
+ val ZK_URL = conf.get("spark.deploy.zookeeper.url", "")
val ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE
val ZK_TIMEOUT_MILLIS = 30000
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
index a61597bbdf..77c23fb9fb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
@@ -28,7 +28,7 @@ private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef,
masterUrl: String, conf: SparkConf)
extends LeaderElectionAgent with SparkZooKeeperWatcher with Logging {
- val WORKING_DIR = conf.getOrElse("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
+ val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
private val watcher = new ZooKeeperWatcher()
private val zk = new SparkZooKeeperSession(this, conf)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
index 245a558a59..52000d4f9c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -27,7 +27,7 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
with SparkZooKeeperWatcher
with Logging
{
- val WORKING_DIR = conf.getOrElse("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
+ val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
val zk = new SparkZooKeeperSession(this, conf)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index f844fcbbfc..fcaf4e92b1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -55,7 +55,7 @@ private[spark] class Worker(
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs
// Send a heartbeat every (heartbeat timeout) / 4 milliseconds
- val HEARTBEAT_MILLIS = conf.getOrElse("spark.worker.timeout", "60").toLong * 1000 / 4
+ val HEARTBEAT_MILLIS = conf.get("spark.worker.timeout", "60").toLong * 1000 / 4
val REGISTRATION_TIMEOUT = 20.seconds
val REGISTRATION_RETRIES = 3
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index a801d85770..c382034c99 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -37,7 +37,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
val timeout = AkkaUtils.askTimeout(worker.conf)
val host = Utils.localHostName()
val port = requestedPort.getOrElse(
- worker.conf.getOrElse("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt)
+ worker.conf.get("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt)
var server: Option[Server] = None
var boundPort: Option[Int] = None
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 5b70165c35..3c92c205ea 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -302,7 +302,7 @@ private[spark] class Executor(
* new classes defined by the REPL as the user types code
*/
private def addReplClassLoaderIfNeeded(parent: ClassLoader): ClassLoader = {
- val classUri = conf.getOrElse("spark.repl.class.uri", null)
+ val classUri = conf.get("spark.repl.class.uri", null)
if (classUri != null) {
logInfo("Using REPL class URI: " + classUri)
try {
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 075a18b068..a1e98845f6 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -39,7 +39,7 @@ trait CompressionCodec {
private[spark] object CompressionCodec {
def createCodec(conf: SparkConf): CompressionCodec = {
- createCodec(conf, conf.getOrElse(
+ createCodec(conf, conf.get(
"spark.io.compression.codec", classOf[LZFCompressionCodec].getName))
}
@@ -71,7 +71,7 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec {
class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
override def compressedOutputStream(s: OutputStream): OutputStream = {
- val blockSize = conf.getOrElse("spark.io.compression.snappy.block.size", "32768").toInt
+ val blockSize = conf.get("spark.io.compression.snappy.block.size", "32768").toInt
new SnappyOutputStream(s, blockSize)
}
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index 0e41c73ce7..9930537b34 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -65,7 +65,7 @@ import org.apache.spark.metrics.source.Source
private[spark] class MetricsSystem private (val instance: String,
conf: SparkConf) extends Logging {
- val confFile = conf.getOrElse("spark.metrics.conf", null)
+ val confFile = conf.get("spark.metrics.conf", null)
val metricsConfig = new MetricsConfig(Option(confFile))
val sinks = new mutable.ArrayBuffer[Sink]
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index 697096fa76..46c40d0a2a 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -54,22 +54,22 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
private val selector = SelectorProvider.provider.openSelector()
private val handleMessageExecutor = new ThreadPoolExecutor(
- conf.getOrElse("spark.core.connection.handler.threads.min", "20").toInt,
- conf.getOrElse("spark.core.connection.handler.threads.max", "60").toInt,
- conf.getOrElse("spark.core.connection.handler.threads.keepalive", "60").toInt, TimeUnit.SECONDS,
+ conf.get("spark.core.connection.handler.threads.min", "20").toInt,
+ conf.get("spark.core.connection.handler.threads.max", "60").toInt,
+ conf.get("spark.core.connection.handler.threads.keepalive", "60").toInt, TimeUnit.SECONDS,
new LinkedBlockingDeque[Runnable]())
private val handleReadWriteExecutor = new ThreadPoolExecutor(
- conf.getOrElse("spark.core.connection.io.threads.min", "4").toInt,
- conf.getOrElse("spark.core.connection.io.threads.max", "32").toInt,
- conf.getOrElse("spark.core.connection.io.threads.keepalive", "60").toInt, TimeUnit.SECONDS,
+ conf.get("spark.core.connection.io.threads.min", "4").toInt,
+ conf.get("spark.core.connection.io.threads.max", "32").toInt,
+ conf.get("spark.core.connection.io.threads.keepalive", "60").toInt, TimeUnit.SECONDS,
new LinkedBlockingDeque[Runnable]())
// Use a different, yet smaller, thread pool - infrequently used with very short lived tasks : which should be executed asap
private val handleConnectExecutor = new ThreadPoolExecutor(
- conf.getOrElse("spark.core.connection.connect.threads.min", "1").toInt,
- conf.getOrElse("spark.core.connection.connect.threads.max", "8").toInt,
- conf.getOrElse("spark.core.connection.connect.threads.keepalive", "60").toInt, TimeUnit.SECONDS,
+ conf.get("spark.core.connection.connect.threads.min", "1").toInt,
+ conf.get("spark.core.connection.connect.threads.max", "8").toInt,
+ conf.get("spark.core.connection.connect.threads.keepalive", "60").toInt, TimeUnit.SECONDS,
new LinkedBlockingDeque[Runnable]())
private val serverChannel = ServerSocketChannel.open()
diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
index db28ddf9ac..b729eb11c5 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
@@ -36,7 +36,7 @@ private[spark] class ShuffleCopier(conf: SparkConf) extends Logging {
resultCollectCallback: (BlockId, Long, ByteBuf) => Unit) {
val handler = new ShuffleCopier.ShuffleClientHandler(resultCollectCallback)
- val connectTimeout = conf.getOrElse("spark.shuffle.netty.connect.timeout", "60000").toInt
+ val connectTimeout = conf.get("spark.shuffle.netty.connect.timeout", "60000").toInt
val fc = new FileClient(handler, connectTimeout)
try {
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index 172ba6b01c..6d4f46125f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -97,7 +97,7 @@ private[spark] object CheckpointRDD extends Logging {
throw new IOException("Checkpoint failed: temporary path " +
tempOutputPath + " already exists")
}
- val bufferSize = env.conf.getOrElse("spark.buffer.size", "65536").toInt
+ val bufferSize = env.conf.get("spark.buffer.size", "65536").toInt
val fileOutputStream = if (blockSize < 0) {
fs.create(tempOutputPath, false, bufferSize)
@@ -131,7 +131,7 @@ private[spark] object CheckpointRDD extends Logging {
): Iterator[T] = {
val env = SparkEnv.get
val fs = path.getFileSystem(broadcastedConf.value.value)
- val bufferSize = env.conf.getOrElse("spark.buffer.size", "65536").toInt
+ val bufferSize = env.conf.get("spark.buffer.size", "65536").toInt
val fileInputStream = fs.open(path, bufferSize)
val serializer = env.serializer.newInstance()
val deserializeStream = serializer.deserializeStream(fileInputStream)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index 29b0247f8a..e22b1e53e8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -31,7 +31,7 @@ import org.apache.spark.util.Utils
private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedulerImpl)
extends Logging {
- private val THREADS = sparkEnv.conf.getOrElse("spark.resultGetter.threads", "4").toInt
+ private val THREADS = sparkEnv.conf.get("spark.resultGetter.threads", "4").toInt
private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool(
THREADS, "Result resolver thread")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index bffd990e16..d94b706854 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -51,15 +51,15 @@ private[spark] class TaskSchedulerImpl(
isLocal: Boolean = false)
extends TaskScheduler with Logging
{
- def this(sc: SparkContext) = this(sc, sc.conf.getOrElse("spark.task.maxFailures", "4").toInt)
+ def this(sc: SparkContext) = this(sc, sc.conf.get("spark.task.maxFailures", "4").toInt)
val conf = sc.conf
// How often to check for speculative tasks
- val SPECULATION_INTERVAL = conf.getOrElse("spark.speculation.interval", "100").toLong
+ val SPECULATION_INTERVAL = conf.get("spark.speculation.interval", "100").toLong
// Threshold above which we warn user initial TaskSet may be starved
- val STARVATION_TIMEOUT = conf.getOrElse("spark.starvation.timeout", "15000").toLong
+ val STARVATION_TIMEOUT = conf.get("spark.starvation.timeout", "15000").toLong
// TaskSetManagers are not thread safe, so any access to one should be synchronized
// on this class.
@@ -96,7 +96,7 @@ private[spark] class TaskSchedulerImpl(
var rootPool: Pool = null
// default scheduler is FIFO
val schedulingMode: SchedulingMode = SchedulingMode.withName(
- conf.getOrElse("spark.scheduler.mode", "FIFO"))
+ conf.get("spark.scheduler.mode", "FIFO"))
// This is a var so that we can reset it for testing purposes.
private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)
@@ -125,7 +125,7 @@ private[spark] class TaskSchedulerImpl(
override def start() {
backend.start()
- if (!isLocal && conf.getOrElse("spark.speculation", "false").toBoolean) {
+ if (!isLocal && conf.get("spark.speculation", "false").toBoolean) {
logInfo("Starting speculative execution thread")
import sc.env.actorSystem.dispatcher
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index b99664ae00..67ad99a4d7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -57,11 +57,11 @@ private[spark] class TaskSetManager(
val conf = sched.sc.conf
// CPUs to request per task
- val CPUS_PER_TASK = conf.getOrElse("spark.task.cpus", "1").toInt
+ val CPUS_PER_TASK = conf.get("spark.task.cpus", "1").toInt
// Quantile of tasks at which to start speculation
- val SPECULATION_QUANTILE = conf.getOrElse("spark.speculation.quantile", "0.75").toDouble
- val SPECULATION_MULTIPLIER = conf.getOrElse("spark.speculation.multiplier", "1.5").toDouble
+ val SPECULATION_QUANTILE = conf.get("spark.speculation.quantile", "0.75").toDouble
+ val SPECULATION_MULTIPLIER = conf.get("spark.speculation.multiplier", "1.5").toDouble
// Serializer for closures and tasks.
val env = SparkEnv.get
@@ -116,7 +116,7 @@ private[spark] class TaskSetManager(
// How frequently to reprint duplicate exceptions in full, in milliseconds
val EXCEPTION_PRINT_INTERVAL =
- conf.getOrElse("spark.logging.exceptionPrintInterval", "10000").toLong
+ conf.get("spark.logging.exceptionPrintInterval", "10000").toLong
// Map of recent exceptions (identified by string representation and top stack frame) to
// duplicate count (how many times the same exception has appeared) and time the full exception
@@ -678,14 +678,14 @@ private[spark] class TaskSetManager(
}
private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
- val defaultWait = conf.getOrElse("spark.locality.wait", "3000")
+ val defaultWait = conf.get("spark.locality.wait", "3000")
level match {
case TaskLocality.PROCESS_LOCAL =>
- conf.getOrElse("spark.locality.wait.process", defaultWait).toLong
+ conf.get("spark.locality.wait.process", defaultWait).toLong
case TaskLocality.NODE_LOCAL =>
- conf.getOrElse("spark.locality.wait.node", defaultWait).toLong
+ conf.get("spark.locality.wait.node", defaultWait).toLong
case TaskLocality.RACK_LOCAL =>
- conf.getOrElse("spark.locality.wait.rack", defaultWait).toLong
+ conf.get("spark.locality.wait.rack", defaultWait).toLong
case TaskLocality.ANY =>
0L
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index b4a3ecca39..2f5bcafe40 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Await
import scala.concurrent.duration._
-import scala.util.Try
import akka.actor._
import akka.pattern.ask
@@ -64,7 +63,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
// Periodically revive offers to allow delay scheduling to work
- val reviveInterval = conf.getOrElse("spark.scheduler.revive.interval", "1000").toLong
+ val reviveInterval = conf.get("spark.scheduler.revive.interval", "1000").toLong
import context.dispatcher
context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers)
}
@@ -209,8 +208,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
driverActor ! KillTask(taskId, executorId)
}
- override def defaultParallelism() = Try(conf.get("spark.default.parallelism")).toOption
- .map(_.toInt).getOrElse(math.max(totalCoreCount.get(), 2))
+ override def defaultParallelism(): Int = {
+ conf.getOption("spark.default.parallelism").map(_.toInt).getOrElse(
+ math.max(totalCoreCount.get(), 2))
+ }
// Called by subclasses when notified of a lost worker
def removeExecutor(executorId: String, reason: String) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index f41fbbd1f3..b44d1e43c8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -33,7 +33,7 @@ private[spark] class SimrSchedulerBackend(
val tmpPath = new Path(driverFilePath + "_tmp")
val filePath = new Path(driverFilePath)
- val maxCores = conf.getOrElse("spark.simr.executor.cores", "1").toInt
+ val maxCores = conf.get("spark.simr.executor.cores", "1").toInt
override def start() {
super.start()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 224077566d..9858717d13 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -38,7 +38,7 @@ private[spark] class SparkDeploySchedulerBackend(
var stopping = false
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
- val maxCores = conf.getOrElse("spark.cores.max", Int.MaxValue.toString).toInt
+ val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
override def start() {
super.start()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 9e2cd3f699..d247fa4244 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -62,7 +62,7 @@ private[spark] class CoarseMesosSchedulerBackend(
var driver: SchedulerDriver = null
// Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
- val maxCores = conf.getOrElse("spark.cores.max", Int.MaxValue.toString).toInt
+ val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
// Cores we have acquired with each Mesos task ID
val coresByTaskId = new HashMap[Int, Int]
@@ -77,7 +77,7 @@ private[spark] class CoarseMesosSchedulerBackend(
"Spark home is not set; set it through the spark.home system " +
"property, the SPARK_HOME environment variable or the SparkContext constructor"))
- val extraCoresPerSlave = conf.getOrElse("spark.mesos.extra.cores", "0").toInt
+ val extraCoresPerSlave = conf.get("spark.mesos.extra.cores", "0").toInt
var nextMesosTaskId = 0
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index be96382983..c20fc418e8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -340,5 +340,5 @@ private[spark] class MesosSchedulerBackend(
}
// TODO: query Mesos for number of cores
- override def defaultParallelism() = sc.conf.getOrElse("spark.default.parallelism", "8").toInt
+ override def defaultParallelism() = sc.conf.get("spark.default.parallelism", "8").toInt
}
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 2367f3f521..a24a3b04b8 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -29,17 +29,14 @@ import org.apache.spark._
import org.apache.spark.broadcast.HttpBroadcast
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage._
-import scala.util.Try
-import org.apache.spark.storage.PutBlock
-import org.apache.spark.storage.GetBlock
-import org.apache.spark.storage.GotBlock
+import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock}
/**
* A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]].
*/
class KryoSerializer(conf: SparkConf) extends org.apache.spark.serializer.Serializer with Logging {
private val bufferSize = {
- conf.getOrElse("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024
+ conf.get("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024
}
def newKryoOutput() = new KryoOutput(bufferSize)
@@ -51,7 +48,7 @@ class KryoSerializer(conf: SparkConf) extends org.apache.spark.serializer.Serial
// Allow disabling Kryo reference tracking if user knows their object graphs don't have loops.
// Do this before we invoke the user registrator so the user registrator can override this.
- kryo.setReferences(conf.getOrElse("spark.kryo.referenceTracking", "true").toBoolean)
+ kryo.setReferences(conf.get("spark.kryo.referenceTracking", "true").toBoolean)
for (cls <- KryoSerializer.toRegister) kryo.register(cls)
@@ -61,13 +58,13 @@ class KryoSerializer(conf: SparkConf) extends org.apache.spark.serializer.Serial
// Allow the user to register their own classes by setting spark.kryo.registrator
try {
- Try(conf.get("spark.kryo.registrator")).toOption.foreach { regCls =>
+ for (regCls <- conf.getOption("spark.kryo.registrator")) {
logDebug("Running user registrator: " + regCls)
val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]
reg.registerClasses(kryo)
}
} catch {
- case _: Exception => println("Failed to register spark.kryo.registrator")
+ case e: Exception => logError("Failed to run spark.kryo.registrator", e)
}
// Register Chill's classes; we do this after our ranges and the user's own classes to let
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
index 3b25f68ca8..47478631a1 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
@@ -327,7 +327,7 @@ object BlockFetcherIterator {
fetchRequestsSync.put(request)
}
- copiers = startCopiers(conf.getOrElse("spark.shuffle.copier.threads", "6").toInt)
+ copiers = startCopiers(conf.get("spark.shuffle.copier.threads", "6").toInt)
logInfo("Started " + fetchRequestsSync.size + " remote gets in " +
Utils.getUsedTimeMs(startTime))
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 16ee208617..6d2cda97b0 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -49,7 +49,7 @@ private[spark] class BlockManager(
val shuffleBlockManager = new ShuffleBlockManager(this)
val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
- conf.getOrElse("spark.local.dir", System.getProperty("java.io.tmpdir")))
+ conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
@@ -58,8 +58,8 @@ private[spark] class BlockManager(
// If we use Netty for shuffle, start a new Netty-based shuffle sender service.
private val nettyPort: Int = {
- val useNetty = conf.getOrElse("spark.shuffle.use.netty", "false").toBoolean
- val nettyPortConfig = conf.getOrElse("spark.shuffle.sender.port", "0").toInt
+ val useNetty = conf.get("spark.shuffle.use.netty", "false").toBoolean
+ val nettyPortConfig = conf.get("spark.shuffle.sender.port", "0").toInt
if (useNetty) diskBlockManager.startShuffleBlockSender(nettyPortConfig) else 0
}
@@ -72,14 +72,14 @@ private[spark] class BlockManager(
// Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory
// for receiving shuffle outputs)
val maxBytesInFlight =
- conf.getOrElse("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024
+ conf.get("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024
// Whether to compress broadcast variables that are stored
- val compressBroadcast = conf.getOrElse("spark.broadcast.compress", "true").toBoolean
+ val compressBroadcast = conf.get("spark.broadcast.compress", "true").toBoolean
// Whether to compress shuffle output that are stored
- val compressShuffle = conf.getOrElse("spark.shuffle.compress", "true").toBoolean
+ val compressShuffle = conf.get("spark.shuffle.compress", "true").toBoolean
// Whether to compress RDD partitions that are stored serialized
- val compressRdds = conf.getOrElse("spark.rdd.compress", "false").toBoolean
+ val compressRdds = conf.get("spark.rdd.compress", "false").toBoolean
val heartBeatFrequency = BlockManager.getHeartBeatFrequency(conf)
@@ -443,7 +443,7 @@ private[spark] class BlockManager(
: BlockFetcherIterator = {
val iter =
- if (conf.getOrElse("spark.shuffle.use.netty", "false").toBoolean) {
+ if (conf.get("spark.shuffle.use.netty", "false").toBoolean) {
new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer)
} else {
new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer)
@@ -469,7 +469,7 @@ private[spark] class BlockManager(
def getDiskWriter(blockId: BlockId, file: File, serializer: Serializer, bufferSize: Int)
: BlockObjectWriter = {
val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
- val syncWrites = conf.getOrElse("spark.shuffle.sync", "false").toBoolean
+ val syncWrites = conf.get("spark.shuffle.sync", "false").toBoolean
new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream, syncWrites)
}
@@ -864,15 +864,15 @@ private[spark] object BlockManager extends Logging {
val ID_GENERATOR = new IdGenerator
def getMaxMemory(conf: SparkConf): Long = {
- val memoryFraction = conf.getOrElse("spark.storage.memoryFraction", "0.66").toDouble
+ val memoryFraction = conf.get("spark.storage.memoryFraction", "0.66").toDouble
(Runtime.getRuntime.maxMemory * memoryFraction).toLong
}
def getHeartBeatFrequency(conf: SparkConf): Long =
- conf.getOrElse("spark.storage.blockManagerTimeoutIntervalMs", "60000").toLong / 4
+ conf.get("spark.storage.blockManagerTimeoutIntervalMs", "60000").toLong / 4
def getDisableHeartBeatsForTesting(conf: SparkConf): Boolean =
- conf.getOrElse("spark.test.disableBlockManagerHeartBeat", "false").toBoolean
+ conf.get("spark.test.disableBlockManagerHeartBeat", "false").toBoolean
/**
* Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 8e4a88b20a..b5afe8cd23 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -31,8 +31,8 @@ private[spark]
class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection],
conf: SparkConf) extends Logging {
- val AKKA_RETRY_ATTEMPTS: Int = conf.getOrElse("spark.akka.num.retries", "3").toInt
- val AKKA_RETRY_INTERVAL_MS: Int = conf.getOrElse("spark.akka.retry.wait", "3000").toInt
+ val AKKA_RETRY_ATTEMPTS: Int = conf.get("spark.akka.num.retries", "3").toInt
+ val AKKA_RETRY_INTERVAL_MS: Int = conf.get("spark.akka.retry.wait", "3000").toInt
val DRIVER_AKKA_ACTOR_NAME = "BlockManagerMaster"
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index dbbeeb39eb..58452d9657 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -50,10 +50,10 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
private val akkaTimeout = AkkaUtils.askTimeout(conf)
- val slaveTimeout = conf.getOrElse("spark.storage.blockManagerSlaveTimeoutMs",
+ val slaveTimeout = conf.get("spark.storage.blockManagerSlaveTimeoutMs",
"" + (BlockManager.getHeartBeatFrequency(conf) * 3)).toLong
- val checkTimeoutInterval = conf.getOrElse("spark.storage.blockManagerTimeoutIntervalMs",
+ val checkTimeoutInterval = conf.get("spark.storage.blockManagerTimeoutIntervalMs",
"60000").toLong
var timeoutCheckingTask: Cancellable = null
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 7697092e1b..55dcb3742c 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -38,7 +38,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
extends PathResolver with Logging {
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
- private val subDirsPerLocalDir = shuffleManager.conf.getOrElse("spark.diskStore.subDirectories", "64").toInt
+ private val subDirsPerLocalDir = shuffleManager.conf.get("spark.diskStore.subDirectories", "64").toInt
// Create one local directory for each path mentioned in spark.local.dir; then, inside this
// directory, create multiple subdirectories that we will hash files into, in order to avoid
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index 151eedb783..39dc7bb19a 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -27,8 +27,6 @@ import org.apache.spark.serializer.Serializer
import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap}
import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
import org.apache.spark.storage.ShuffleBlockManager.ShuffleFileGroup
-import scala.util.Try
-import org.apache.spark.SparkConf
/** A group of writers for a ShuffleMapTask, one writer per reducer. */
private[spark] trait ShuffleWriterGroup {
@@ -66,9 +64,9 @@ class ShuffleBlockManager(blockManager: BlockManager) {
// Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
// TODO: Remove this once the shuffle file consolidation feature is stable.
val consolidateShuffleFiles =
- conf.getOrElse("spark.shuffle.consolidateFiles", "false").toBoolean
+ conf.get("spark.shuffle.consolidateFiles", "false").toBoolean
- private val bufferSize = conf.getOrElse("spark.shuffle.file.buffer.kb", "100").toInt * 1024
+ private val bufferSize = conf.get("spark.shuffle.file.buffer.kb", "100").toInt * 1024
/**
* Contains all the state related to a particular shuffle. This includes a pool of unused
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index 0ce8d9c8c4..50dfdbdf5a 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -32,7 +32,7 @@ import org.apache.spark.util.Utils
/** Top level user interface for Spark */
private[spark] class SparkUI(sc: SparkContext) extends Logging {
val host = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(Utils.localHostName())
- val port = sc.conf.getOrElse("spark.ui.port", SparkUI.DEFAULT_PORT).toInt
+ val port = sc.conf.get("spark.ui.port", SparkUI.DEFAULT_PORT).toInt
var boundPort: Option[Int] = None
var server: Option[Server] = None
diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
index 6b4602f928..88f41be8d3 100644
--- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
@@ -66,7 +66,7 @@ private[spark] class EnvironmentUI(sc: SparkContext) {
UIUtils.listingTable(propertyHeaders, propertyRow, otherProperties, fixedWidth = true)
val classPathEntries = classPathProperty._2
- .split(sc.conf.getOrElse("path.separator", ":"))
+ .split(sc.conf.get("path.separator", ":"))
.filterNot(e => e.isEmpty)
.map(e => (e, "System Classpath"))
val addedJars = sc.addedJars.iterator.toSeq.map{case (path, time) => (path, "Added By User")}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 315014d27d..b7b87250b9 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -33,7 +33,7 @@ import org.apache.spark.scheduler._
*/
private[spark] class JobProgressListener(val sc: SparkContext) extends SparkListener {
// How many stages to remember
- val RETAINED_STAGES = sc.conf.getOrElse("spark.ui.retained_stages", "1000").toInt
+ val RETAINED_STAGES = sc.conf.get("spark.ui.retained_stages", "1000").toInt
val DEFAULT_POOL_NAME = "default"
val stageIdToPool = new HashMap[Int, String]()
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 58b26f7f12..362cea5e3e 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -41,19 +41,19 @@ private[spark] object AkkaUtils {
def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false,
conf: SparkConf): (ActorSystem, Int) = {
- val akkaThreads = conf.getOrElse("spark.akka.threads", "4").toInt
- val akkaBatchSize = conf.getOrElse("spark.akka.batchSize", "15").toInt
+ val akkaThreads = conf.get("spark.akka.threads", "4").toInt
+ val akkaBatchSize = conf.get("spark.akka.batchSize", "15").toInt
- val akkaTimeout = conf.getOrElse("spark.akka.timeout", "100").toInt
+ val akkaTimeout = conf.get("spark.akka.timeout", "100").toInt
- val akkaFrameSize = conf.getOrElse("spark.akka.frameSize", "10").toInt
+ val akkaFrameSize = conf.get("spark.akka.frameSize", "10").toInt
val lifecycleEvents =
- if (conf.getOrElse("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
+ if (conf.get("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
- val akkaHeartBeatPauses = conf.getOrElse("spark.akka.heartbeat.pauses", "600").toInt
+ val akkaHeartBeatPauses = conf.get("spark.akka.heartbeat.pauses", "600").toInt
val akkaFailureDetector =
- conf.getOrElse("spark.akka.failure-detector.threshold", "300.0").toDouble
- val akkaHeartBeatInterval = conf.getOrElse("spark.akka.heartbeat.interval", "1000").toInt
+ conf.get("spark.akka.failure-detector.threshold", "300.0").toDouble
+ val akkaHeartBeatInterval = conf.get("spark.akka.heartbeat.interval", "1000").toInt
val akkaConf = ConfigFactory.parseString(
s"""
@@ -89,6 +89,6 @@ private[spark] object AkkaUtils {
/** Returns the default Spark timeout to use for Akka ask operations. */
def askTimeout(conf: SparkConf): FiniteDuration = {
- Duration.create(conf.getOrElse("spark.akka.askTimeout", "30").toLong, "seconds")
+ Duration.create(conf.get("spark.akka.askTimeout", "30").toLong, "seconds")
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
index 9ea7fc2dfd..aa7f52cafb 100644
--- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
@@ -74,12 +74,12 @@ object MetadataCleanerType extends Enumeration {
// initialization of StreamingContext. It's okay for users trying to configure stuff themselves.
object MetadataCleaner {
def getDelaySeconds(conf: SparkConf) = {
- conf.getOrElse("spark.cleaner.ttl", "3500").toInt
+ conf.get("spark.cleaner.ttl", "3500").toInt
}
def getDelaySeconds(conf: SparkConf, cleanerType: MetadataCleanerType.MetadataCleanerType): Int =
{
- conf.getOrElse(MetadataCleanerType.systemProperty(cleanerType), getDelaySeconds(conf).toString)
+ conf.get(MetadataCleanerType.systemProperty(cleanerType), getDelaySeconds(conf).toString)
.toInt
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index ca3320b22b..5f1253100b 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -311,7 +311,7 @@ private[spark] object Utils extends Logging {
* multiple paths.
*/
def getLocalDir(conf: SparkConf): String = {
- conf.getOrElse("spark.local.dir", System.getProperty("java.io.tmpdir")).split(',')(0)
+ conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")).split(',')(0)
}
/**
@@ -397,7 +397,7 @@ private[spark] object Utils extends Logging {
}
def localHostPort(conf: SparkConf): String = {
- val retval = conf.getOrElse("spark.hostPort", null)
+ val retval = conf.get("spark.hostPort", null)
if (retval == null) {
logErrorWithStack("spark.hostPort not set but invoking localHostPort")
return localHostName()
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 77c7b829b3..ef5936dd2f 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -74,7 +74,7 @@ class SparkConfSuite extends FunSuite with LocalSparkContext {
assert(!conf.contains("k4"), "conf contained k4")
assert(conf.get("k1") === "v4")
intercept[Exception] { conf.get("k4") }
- assert(conf.getOrElse("k4", "not found") === "not found")
+ assert(conf.get("k4", "not found") === "not found")
assert(conf.getOption("k1") === Some("v4"))
assert(conf.getOption("k4") === None)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 5d33e66253..1eec6726f4 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -83,7 +83,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
private val conf = new SparkConf
- val LOCALITY_WAIT = conf.getOrElse("spark.locality.wait", "3000").toLong
+ val LOCALITY_WAIT = conf.get("spark.locality.wait", "3000").toLong
val MAX_TASK_FAILURES = 4
test("TaskSet with no preferences") {
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index f940448abd..af4b31d53c 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -27,8 +27,8 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
import scala.util.Try
import akka.actor.{Props, ActorSelection, ActorSystem}
-class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll {
- private val testConf = new SparkConf
+class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach {
+ private val testConf = new SparkConf(false)
val rootDir0 = Files.createTempDir()
rootDir0.deleteOnExit()
val rootDir1 = Files.createTempDir()
@@ -38,9 +38,7 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before
// This suite focuses primarily on consolidation features,
// so we coerce consolidation if not already enabled.
- val consolidateProp = "spark.shuffle.consolidateFiles"
- val oldConsolidate = Try(testConf.get(consolidateProp)).toOption
- testConf.set(consolidateProp, "true")
+ testConf.set("spark.shuffle.consolidateFiles", "true")
val shuffleBlockManager = new ShuffleBlockManager(null) {
override def conf = testConf.clone
@@ -50,10 +48,6 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before
var diskBlockManager: DiskBlockManager = _
- override def afterAll() {
- oldConsolidate.map(c => System.setProperty(consolidateProp, c))
- }
-
override def beforeEach() {
diskBlockManager = new DiskBlockManager(shuffleBlockManager, rootDirs)
shuffleBlockManager.idToSegmentMap.clear()
diff --git a/python/pyspark/conf.py b/python/pyspark/conf.py
index 9dcdcfaa67..c111e2e90f 100644
--- a/python/pyspark/conf.py
+++ b/python/pyspark/conf.py
@@ -93,7 +93,7 @@ class SparkConf(object):
def set(self, key, value):
"""Set a configuration property."""
- self._jconf.set(key, value)
+ self._jconf.set(key, unicode(value))
return self
def setMaster(self, value):
@@ -132,13 +132,9 @@ class SparkConf(object):
self._jconf.set(k, v)
return self
- def get(self, key):
- """Get the configured value for some key, if set."""
- return self._jconf.get(key)
-
- def getOrElse(self, key, defaultValue):
- """Get the value for some key, or return a default otherwise."""
- return self._jconf.getOrElse(key, defaultValue)
+ def get(self, key, defaultValue=None):
+ """Get the configured value for some key, or return a default otherwise."""
+ return self._jconf.get(key, defaultValue)
def getAll(self):
"""Get all values as a list of key-value pairs."""
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
index a993083164..59fdb0b37a 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
@@ -89,7 +89,7 @@ import org.apache.spark.util.Utils
/** Local directory to save .class files too */
val outputDir = {
val tmp = System.getProperty("java.io.tmpdir")
- val rootDir = new SparkConf().getOrElse("spark.repl.classdir", tmp)
+ val rootDir = new SparkConf().get("spark.repl.classdir", tmp)
Utils.createTempDir(rootDir)
}
if (SPARK_DEBUG_REPL) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index a230845b92..27d474c0a0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -174,8 +174,8 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
/** A helper actor that communicates with the NetworkInputTracker */
private class NetworkReceiverActor extends Actor {
logInfo("Attempting to register with tracker")
- val ip = env.conf.getOrElse("spark.driver.host", "localhost")
- val port = env.conf.getOrElse("spark.driver.port", "7077").toInt
+ val ip = env.conf.get("spark.driver.host", "localhost")
+ val port = env.conf.get("spark.driver.port", "7077").toInt
val url = "akka.tcp://spark@%s:%s/user/NetworkInputTracker".format(ip, port)
val tracker = env.actorSystem.actorSelection(url)
val timeout = 5.seconds
@@ -212,7 +212,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
case class Block(id: BlockId, buffer: ArrayBuffer[T], metadata: Any = null)
val clock = new SystemClock()
- val blockInterval = env.conf.getOrElse("spark.streaming.blockInterval", "200").toLong
+ val blockInterval = env.conf.get("spark.streaming.blockInterval", "200").toLong
val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer)
val blockStorageLevel = storageLevel
val blocksForPushing = new ArrayBlockingQueue[Block](1000)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 844180c81a..5f8be93a98 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -46,7 +46,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
}
}))
val clock = {
- val clockClass = ssc.sc.conf.getOrElse(
+ val clockClass = ssc.sc.conf.get(
"spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
Class.forName(clockClass).newInstance().asInstanceOf[Clock]
}
@@ -104,7 +104,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
// or if the property is defined set it to that time
if (clock.isInstanceOf[ManualClock]) {
val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
- val jumpTime = ssc.sc.conf.getOrElse("spark.streaming.manualClock.jump", "0").toLong
+ val jumpTime = ssc.sc.conf.get("spark.streaming.manualClock.jump", "0").toLong
clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 651cdaaa6d..9304fc1a93 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -31,7 +31,7 @@ private[streaming]
class JobScheduler(val ssc: StreamingContext) extends Logging {
val jobSets = new ConcurrentHashMap[Time, JobSet]
- val numConcurrentJobs = ssc.conf.getOrElse("spark.streaming.concurrentJobs", "1").toInt
+ val numConcurrentJobs = ssc.conf.get("spark.streaming.concurrentJobs", "1").toInt
val executor = Executors.newFixedThreadPool(numConcurrentJobs)
val generator = new JobGenerator(this)
val listenerBus = new StreamingListenerBus()