From e2c68642c64345434e2034082cf9b299491e9e9f Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Wed, 1 Jan 2014 22:03:39 -0500 Subject: Miscellaneous fixes from code review. Also replaced SparkConf.getOrElse with just a "get" that takes a default value, and added getInt, getLong, etc to make code that uses this simpler later on. --- .../main/scala/org/apache/spark/SparkConf.scala | 60 +++++++++++------- .../main/scala/org/apache/spark/SparkContext.scala | 73 +++++++++++++--------- .../src/main/scala/org/apache/spark/SparkEnv.scala | 10 +-- .../org/apache/spark/api/python/PythonRDD.scala | 4 +- .../org/apache/spark/broadcast/Broadcast.scala | 4 +- .../org/apache/spark/broadcast/HttpBroadcast.scala | 4 +- .../apache/spark/broadcast/TorrentBroadcast.scala | 2 +- .../org/apache/spark/deploy/master/Master.scala | 12 ++-- .../spark/deploy/master/MasterArguments.scala | 6 +- .../deploy/master/SparkZooKeeperSession.scala | 2 +- .../master/ZooKeeperLeaderElectionAgent.scala | 2 +- .../deploy/master/ZooKeeperPersistenceEngine.scala | 2 +- .../org/apache/spark/deploy/worker/Worker.scala | 2 +- .../spark/deploy/worker/ui/WorkerWebUI.scala | 2 +- .../scala/org/apache/spark/executor/Executor.scala | 2 +- .../org/apache/spark/io/CompressionCodec.scala | 4 +- .../org/apache/spark/metrics/MetricsSystem.scala | 2 +- .../apache/spark/network/ConnectionManager.scala | 18 +++--- .../apache/spark/network/netty/ShuffleCopier.scala | 2 +- .../scala/org/apache/spark/rdd/CheckpointRDD.scala | 4 +- .../apache/spark/scheduler/TaskResultGetter.scala | 2 +- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 10 +-- .../apache/spark/scheduler/TaskSetManager.scala | 16 ++--- .../cluster/CoarseGrainedSchedulerBackend.scala | 9 +-- .../scheduler/cluster/SimrSchedulerBackend.scala | 2 +- .../cluster/SparkDeploySchedulerBackend.scala | 2 +- .../mesos/CoarseMesosSchedulerBackend.scala | 4 +- .../cluster/mesos/MesosSchedulerBackend.scala | 2 +- .../apache/spark/serializer/KryoSerializer.scala | 13 ++-- .../spark/storage/BlockFetcherIterator.scala | 2 +- .../org/apache/spark/storage/BlockManager.scala | 24 +++---- .../apache/spark/storage/BlockManagerMaster.scala | 4 +- .../spark/storage/BlockManagerMasterActor.scala | 4 +- .../apache/spark/storage/DiskBlockManager.scala | 2 +- .../apache/spark/storage/ShuffleBlockManager.scala | 6 +- .../main/scala/org/apache/spark/ui/SparkUI.scala | 4 +- .../org/apache/spark/ui/env/EnvironmentUI.scala | 2 +- .../apache/spark/ui/jobs/JobProgressListener.scala | 2 +- .../scala/org/apache/spark/util/AkkaUtils.scala | 18 +++--- .../org/apache/spark/util/MetadataCleaner.scala | 4 +- .../main/scala/org/apache/spark/util/Utils.scala | 4 +- .../scala/org/apache/spark/SparkConfSuite.scala | 2 +- .../spark/scheduler/TaskSetManagerSuite.scala | 2 +- .../spark/storage/DiskBlockManagerSuite.scala | 12 +--- 44 files changed, 195 insertions(+), 174 deletions(-) (limited to 'core/src') diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 96239cf4be..98343e9532 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -42,6 +42,12 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable { /** Set a configuration variable. */ def set(key: String, value: String): SparkConf = { + if (key == null) { + throw new NullPointerException("null key") + } + if (value == null) { + throw new NullPointerException("null value") + } settings(key) = value this } @@ -51,26 +57,17 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable { * run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster. */ def setMaster(master: String): SparkConf = { - if (master != null) { - settings("spark.master") = master - } - this + set("spark.master", master) } /** Set a name for your application. Shown in the Spark web UI. */ def setAppName(name: String): SparkConf = { - if (name != null) { - settings("spark.app.name") = name - } - this + set("spark.app.name", name) } /** Set JAR files to distribute to the cluster. */ def setJars(jars: Seq[String]): SparkConf = { - if (!jars.isEmpty) { - settings("spark.jars") = jars.mkString(",") - } - this + set("spark.jars", jars.mkString(",")) } /** Set JAR files to distribute to the cluster. (Java-friendly version.) */ @@ -84,8 +81,7 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable { * (for example spark.executorEnv.PATH) but this method makes them easier to set. */ def setExecutorEnv(variable: String, value: String): SparkConf = { - settings("spark.executorEnv." + variable) = value - this + set("spark.executorEnv." + variable, value) } /** @@ -112,10 +108,7 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable { * Set the location where Spark is installed on worker nodes. */ def setSparkHome(home: String): SparkConf = { - if (home != null) { - settings("spark.home") = home - } - this + set("spark.home", home) } /** Set multiple parameters together */ @@ -132,9 +125,20 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable { this } - /** Get a parameter; throws an exception if it's not set */ + /** Remove a parameter from the configuration */ + def remove(key: String): SparkConf = { + settings.remove(key) + this + } + + /** Get a parameter; throws a NoSuchElementException if it's not set */ def get(key: String): String = { - settings(key) + settings.getOrElse(key, throw new NoSuchElementException(key)) + } + + /** Get a parameter, falling back to a default if not set */ + def get(key: String, defaultValue: String): String = { + settings.getOrElse(key, defaultValue) } /** Get a parameter as an Option */ @@ -145,9 +149,19 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable { /** Get all parameters as a list of pairs */ def getAll: Array[(String, String)] = settings.clone().toArray - /** Get a parameter, falling back to a default if not set */ - def getOrElse(k: String, defaultValue: String): String = { - settings.getOrElse(k, defaultValue) + /** Get a parameter as an integer, falling back to a default if not set */ + def getInt(key: String, defaultValue: Int): Int = { + getOption(key).map(_.toInt).getOrElse(defaultValue) + } + + /** Get a parameter as a long, falling back to a default if not set */ + def getLong(key: String, defaultValue: Long): Long = { + getOption(key).map(_.toLong).getOrElse(defaultValue) + } + + /** Get a parameter as a double, falling back to a default if not set */ + def getDouble(key: String, defaultValue: Double): Double = { + getOption(key).map(_.toDouble).getOrElse(defaultValue) } /** Get all executor environment variables set on this SparkConf */ diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 46874c41a2..84bd0f7ffd 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -22,12 +22,11 @@ import java.net.URI import java.util.{UUID, Properties} import java.util.concurrent.atomic.AtomicInteger -import scala.collection.{Map, Set, immutable} +import scala.collection.{Map, Set} import scala.collection.generic.Growable import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.reflect.{ClassTag, classTag} -import scala.util.Try import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -49,7 +48,8 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me import org.apache.spark.scheduler.local.LocalBackend import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils} import org.apache.spark.ui.SparkUI -import org.apache.spark.util._ +import org.apache.spark.util.{Utils, TimeStampedHashMap, MetadataCleaner, MetadataCleanerType, +ClosureCleaner} /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -77,7 +77,7 @@ class SparkContext( * @param conf a [[org.apache.spark.SparkConf]] object specifying other Spark parameters */ def this(master: String, appName: String, conf: SparkConf) = - this(conf.clone().setMaster(master).setAppName(appName)) + this(SparkContext.updatedConf(conf, master, appName)) /** * Alternative constructor that allows setting common Spark properties directly @@ -97,13 +97,7 @@ class SparkContext( environment: Map[String, String] = Map(), preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) = { - this( - new SparkConf() - .setMaster(master) - .setAppName(appName) - .setJars(jars) - .setExecutorEnv(environment.toSeq) - .setSparkHome(sparkHome), + this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment), preferredNodeLocationData) } @@ -175,11 +169,9 @@ class SparkContext( // Environment variables to pass to our executors private[spark] val executorEnvs = HashMap[String, String]() // Note: SPARK_MEM is included for Mesos, but overwritten for standalone mode in ExecutorRunner - for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS", "SPARK_TESTING")) { - val value = System.getenv(key) - if (value != null) { - executorEnvs(key) = value - } + for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS", "SPARK_TESTING"); + value <- Option(System.getenv(key))) { + executorEnvs(key) = value } // Since memory can be set with a system property too, use that executorEnvs("SPARK_MEM") = executorMemory + "m" @@ -220,7 +212,7 @@ class SparkContext( hadoopConf.set(key.substring("spark.hadoop.".length), value) } } - val bufferSize = conf.getOrElse("spark.buffer.size", "65536") + val bufferSize = conf.get("spark.buffer.size", "65536") hadoopConf.set("io.file.buffer.size", bufferSize) hadoopConf } @@ -733,13 +725,7 @@ class SparkContext( * (in that order of preference). If neither of these is set, return None. */ private[spark] def getSparkHome(): Option[String] = { - if (conf.contains("spark.home")) { - Some(conf.get("spark.home")) - } else if (System.getenv("SPARK_HOME") != null) { - Some(System.getenv("SPARK_HOME")) - } else { - None - } + conf.getOption("spark.home").orElse(Option(System.getenv("SPARK_HOME"))) } /** @@ -1026,7 +1012,7 @@ object SparkContext { /** * Find the JAR from which a given class was loaded, to make it easy for users to pass - * their JARs to SparkContext + * their JARs to SparkContext. */ def jarOfClass(cls: Class[_]): Seq[String] = { val uri = cls.getResource("/" + cls.getName.replace('.', '/') + ".class") @@ -1043,10 +1029,41 @@ object SparkContext { } } - /** Find the JAR that contains the class of a particular object */ + /** + * Find the JAR that contains the class of a particular object, to make it easy for users + * to pass their JARs to SparkContext. In most cases you can call jarOfObject(this) in + * your driver program. + */ def jarOfObject(obj: AnyRef): Seq[String] = jarOfClass(obj.getClass) - // Creates a task scheduler based on a given master URL. Extracted for testing. + /** + * Creates a modified version of a SparkConf with the parameters that can be passed separately + * to SparkContext, to make it easier to write SparkContext's constructors. This ignores + * parameters that are passed as the default value of null, instead of throwing an exception + * like SparkConf would. + */ + private def updatedConf( + conf: SparkConf, + master: String, + appName: String, + sparkHome: String = null, + jars: Seq[String] = Nil, + environment: Map[String, String] = Map()): SparkConf = + { + val res = conf.clone() + res.setMaster(master) + res.setAppName(appName) + if (sparkHome != null) { + res.setSparkHome(sparkHome) + } + if (!jars.isEmpty) { + res.setJars(jars) + } + res.setExecutorEnv(environment.toSeq) + res + } + + /** Creates a task scheduler based on a given master URL. Extracted for testing. */ private def createTaskScheduler(sc: SparkContext, master: String, appName: String) : TaskScheduler = { @@ -1156,7 +1173,7 @@ object SparkContext { case mesosUrl @ MESOS_REGEX(_) => MesosNativeLibrary.load() val scheduler = new TaskSchedulerImpl(sc) - val coarseGrained = sc.conf.getOrElse("spark.mesos.coarse", "false").toBoolean + val coarseGrained = sc.conf.get("spark.mesos.coarse", "false").toBoolean val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs val backend = if (coarseGrained) { new CoarseMesosSchedulerBackend(scheduler, sc, url, appName) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index d06af8e667..634a94f0a7 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -144,17 +144,17 @@ object SparkEnv extends Logging { // Create an instance of the class named by the given Java system property, or by // defaultClassName if the property is not set, and return it as a T def instantiateClass[T](propertyName: String, defaultClassName: String): T = { - val name = conf.getOrElse(propertyName, defaultClassName) + val name = conf.get(propertyName, defaultClassName) Class.forName(name, true, classLoader).newInstance().asInstanceOf[T] } val serializerManager = new SerializerManager val serializer = serializerManager.setDefault( - conf.getOrElse("spark.serializer", "org.apache.spark.serializer.JavaSerializer"), conf) + conf.get("spark.serializer", "org.apache.spark.serializer.JavaSerializer"), conf) val closureSerializer = serializerManager.get( - conf.getOrElse("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"), + conf.get("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"), conf) def registerOrLookup(name: String, newActor: => Actor): Either[ActorRef, ActorSelection] = { @@ -162,8 +162,8 @@ object SparkEnv extends Logging { logInfo("Registering " + name) Left(actorSystem.actorOf(Props(newActor), name = name)) } else { - val driverHost: String = conf.getOrElse("spark.driver.host", "localhost") - val driverPort: Int = conf.getOrElse("spark.driver.port", "7077").toInt + val driverHost: String = conf.get("spark.driver.host", "localhost") + val driverPort: Int = conf.get("spark.driver.port", "7077").toInt Utils.checkHost(driverHost, "Expected hostname") val url = "akka.tcp://spark@%s:%s/user/%s".format(driverHost, driverPort, name) logInfo("Connecting to " + name + ": " + url) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 05fd824254..32cc70e8c9 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -41,7 +41,7 @@ private[spark] class PythonRDD[T: ClassTag]( accumulator: Accumulator[JList[Array[Byte]]]) extends RDD[Array[Byte]](parent) { - val bufferSize = conf.getOrElse("spark.buffer.size", "65536").toInt + val bufferSize = conf.get("spark.buffer.size", "65536").toInt override def getPartitions = parent.partitions @@ -250,7 +250,7 @@ private class PythonAccumulatorParam(@transient serverHost: String, serverPort: Utils.checkHost(serverHost, "Expected hostname") - val bufferSize = SparkEnv.get.conf.getOrElse("spark.buffer.size", "65536").toInt + val bufferSize = SparkEnv.get.conf.get("spark.buffer.size", "65536").toInt override def zero(value: JList[Array[Byte]]): JList[Array[Byte]] = new JArrayList diff --git a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala index be99d229ef..0fc478a419 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala @@ -31,7 +31,7 @@ abstract class Broadcast[T](private[spark] val id: Long) extends Serializable { override def toString = "Broadcast(" + id + ")" } -private[spark] +private[spark] class BroadcastManager(val _isDriver: Boolean, conf: SparkConf) extends Logging with Serializable { private var initialized = false @@ -43,7 +43,7 @@ class BroadcastManager(val _isDriver: Boolean, conf: SparkConf) extends Logging private def initialize() { synchronized { if (!initialized) { - val broadcastFactoryClass = conf.getOrElse( + val broadcastFactoryClass = conf.get( "spark.broadcast.factory", "org.apache.spark.broadcast.HttpBroadcastFactory") broadcastFactory = diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index 47528bcee8..db596d5fcc 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -92,8 +92,8 @@ private object HttpBroadcast extends Logging { def initialize(isDriver: Boolean, conf: SparkConf) { synchronized { if (!initialized) { - bufferSize = conf.getOrElse("spark.buffer.size", "65536").toInt - compress = conf.getOrElse("spark.broadcast.compress", "true").toBoolean + bufferSize = conf.get("spark.buffer.size", "65536").toInt + compress = conf.get("spark.broadcast.compress", "true").toBoolean if (isDriver) { createServer(conf) conf.set("spark.httpBroadcast.uri", serverUri) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 00ec3b971b..9530938278 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -180,7 +180,7 @@ extends Logging { initialized = false } - lazy val BLOCK_SIZE = conf.getOrElse("spark.broadcast.blockSize", "4096").toInt * 1024 + lazy val BLOCK_SIZE = conf.get("spark.broadcast.blockSize", "4096").toInt * 1024 def blockifyObject[T](obj: T): TorrentInfo = { val byteArray = Utils.serialize[T](obj) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 9c89e36b14..7b696cfcca 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -43,11 +43,11 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act val conf = new SparkConf val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs - val WORKER_TIMEOUT = conf.getOrElse("spark.worker.timeout", "60").toLong * 1000 - val RETAINED_APPLICATIONS = conf.getOrElse("spark.deploy.retainedApplications", "200").toInt - val REAPER_ITERATIONS = conf.getOrElse("spark.dead.worker.persistence", "15").toInt - val RECOVERY_DIR = conf.getOrElse("spark.deploy.recoveryDirectory", "") - val RECOVERY_MODE = conf.getOrElse("spark.deploy.recoveryMode", "NONE") + val WORKER_TIMEOUT = conf.get("spark.worker.timeout", "60").toLong * 1000 + val RETAINED_APPLICATIONS = conf.get("spark.deploy.retainedApplications", "200").toInt + val REAPER_ITERATIONS = conf.get("spark.dead.worker.persistence", "15").toInt + val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "") + val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE") var nextAppNumber = 0 val workers = new HashSet[WorkerInfo] @@ -88,7 +88,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act // As a temporary workaround before better ways of configuring memory, we allow users to set // a flag that will perform round-robin scheduling across the nodes (spreading out each app // among all the nodes) instead of trying to consolidate each app onto a small # of nodes. - val spreadOutApps = conf.getOrElse("spark.deploy.spreadOut", "true").toBoolean + val spreadOutApps = conf.get("spark.deploy.spreadOut", "true").toBoolean override def preStart() { logInfo("Starting Spark master at " + masterUrl) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala index 7ce83f9c36..e7f3224091 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala @@ -27,8 +27,8 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) { var host = Utils.localHostName() var port = 7077 var webUiPort = 8080 - - // Check for settings in environment variables + + // Check for settings in environment variables if (System.getenv("SPARK_MASTER_HOST") != null) { host = System.getenv("SPARK_MASTER_HOST") } @@ -38,7 +38,7 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) { if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) { webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt } - if (conf.get("master.ui.port") != null) { + if (conf.contains("master.ui.port")) { webUiPort = conf.get("master.ui.port").toInt } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala index 60c7a7c2d6..999090ad74 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala @@ -37,7 +37,7 @@ import org.apache.spark.{SparkConf, Logging} */ private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher, conf: SparkConf) extends Logging { - val ZK_URL = conf.getOrElse("spark.deploy.zookeeper.url", "") + val ZK_URL = conf.get("spark.deploy.zookeeper.url", "") val ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE val ZK_TIMEOUT_MILLIS = 30000 diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala index a61597bbdf..77c23fb9fb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala @@ -28,7 +28,7 @@ private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String, conf: SparkConf) extends LeaderElectionAgent with SparkZooKeeperWatcher with Logging { - val WORKING_DIR = conf.getOrElse("spark.deploy.zookeeper.dir", "/spark") + "/leader_election" + val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election" private val watcher = new ZooKeeperWatcher() private val zk = new SparkZooKeeperSession(this, conf) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala index 245a558a59..52000d4f9c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala @@ -27,7 +27,7 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf) with SparkZooKeeperWatcher with Logging { - val WORKING_DIR = conf.getOrElse("spark.deploy.zookeeper.dir", "/spark") + "/master_status" + val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status" val zk = new SparkZooKeeperSession(this, conf) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index f844fcbbfc..fcaf4e92b1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -55,7 +55,7 @@ private[spark] class Worker( val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs // Send a heartbeat every (heartbeat timeout) / 4 milliseconds - val HEARTBEAT_MILLIS = conf.getOrElse("spark.worker.timeout", "60").toLong * 1000 / 4 + val HEARTBEAT_MILLIS = conf.get("spark.worker.timeout", "60").toLong * 1000 / 4 val REGISTRATION_TIMEOUT = 20.seconds val REGISTRATION_RETRIES = 3 diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index a801d85770..c382034c99 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -37,7 +37,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I val timeout = AkkaUtils.askTimeout(worker.conf) val host = Utils.localHostName() val port = requestedPort.getOrElse( - worker.conf.getOrElse("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt) + worker.conf.get("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt) var server: Option[Server] = None var boundPort: Option[Int] = None diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 5b70165c35..3c92c205ea 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -302,7 +302,7 @@ private[spark] class Executor( * new classes defined by the REPL as the user types code */ private def addReplClassLoaderIfNeeded(parent: ClassLoader): ClassLoader = { - val classUri = conf.getOrElse("spark.repl.class.uri", null) + val classUri = conf.get("spark.repl.class.uri", null) if (classUri != null) { logInfo("Using REPL class URI: " + classUri) try { diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 075a18b068..a1e98845f6 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -39,7 +39,7 @@ trait CompressionCodec { private[spark] object CompressionCodec { def createCodec(conf: SparkConf): CompressionCodec = { - createCodec(conf, conf.getOrElse( + createCodec(conf, conf.get( "spark.io.compression.codec", classOf[LZFCompressionCodec].getName)) } @@ -71,7 +71,7 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec { class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec { override def compressedOutputStream(s: OutputStream): OutputStream = { - val blockSize = conf.getOrElse("spark.io.compression.snappy.block.size", "32768").toInt + val blockSize = conf.get("spark.io.compression.snappy.block.size", "32768").toInt new SnappyOutputStream(s, blockSize) } diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 0e41c73ce7..9930537b34 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -65,7 +65,7 @@ import org.apache.spark.metrics.source.Source private[spark] class MetricsSystem private (val instance: String, conf: SparkConf) extends Logging { - val confFile = conf.getOrElse("spark.metrics.conf", null) + val confFile = conf.get("spark.metrics.conf", null) val metricsConfig = new MetricsConfig(Option(confFile)) val sinks = new mutable.ArrayBuffer[Sink] diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index 697096fa76..46c40d0a2a 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -54,22 +54,22 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi private val selector = SelectorProvider.provider.openSelector() private val handleMessageExecutor = new ThreadPoolExecutor( - conf.getOrElse("spark.core.connection.handler.threads.min", "20").toInt, - conf.getOrElse("spark.core.connection.handler.threads.max", "60").toInt, - conf.getOrElse("spark.core.connection.handler.threads.keepalive", "60").toInt, TimeUnit.SECONDS, + conf.get("spark.core.connection.handler.threads.min", "20").toInt, + conf.get("spark.core.connection.handler.threads.max", "60").toInt, + conf.get("spark.core.connection.handler.threads.keepalive", "60").toInt, TimeUnit.SECONDS, new LinkedBlockingDeque[Runnable]()) private val handleReadWriteExecutor = new ThreadPoolExecutor( - conf.getOrElse("spark.core.connection.io.threads.min", "4").toInt, - conf.getOrElse("spark.core.connection.io.threads.max", "32").toInt, - conf.getOrElse("spark.core.connection.io.threads.keepalive", "60").toInt, TimeUnit.SECONDS, + conf.get("spark.core.connection.io.threads.min", "4").toInt, + conf.get("spark.core.connection.io.threads.max", "32").toInt, + conf.get("spark.core.connection.io.threads.keepalive", "60").toInt, TimeUnit.SECONDS, new LinkedBlockingDeque[Runnable]()) // Use a different, yet smaller, thread pool - infrequently used with very short lived tasks : which should be executed asap private val handleConnectExecutor = new ThreadPoolExecutor( - conf.getOrElse("spark.core.connection.connect.threads.min", "1").toInt, - conf.getOrElse("spark.core.connection.connect.threads.max", "8").toInt, - conf.getOrElse("spark.core.connection.connect.threads.keepalive", "60").toInt, TimeUnit.SECONDS, + conf.get("spark.core.connection.connect.threads.min", "1").toInt, + conf.get("spark.core.connection.connect.threads.max", "8").toInt, + conf.get("spark.core.connection.connect.threads.keepalive", "60").toInt, TimeUnit.SECONDS, new LinkedBlockingDeque[Runnable]()) private val serverChannel = ServerSocketChannel.open() diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala index db28ddf9ac..b729eb11c5 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala @@ -36,7 +36,7 @@ private[spark] class ShuffleCopier(conf: SparkConf) extends Logging { resultCollectCallback: (BlockId, Long, ByteBuf) => Unit) { val handler = new ShuffleCopier.ShuffleClientHandler(resultCollectCallback) - val connectTimeout = conf.getOrElse("spark.shuffle.netty.connect.timeout", "60000").toInt + val connectTimeout = conf.get("spark.shuffle.netty.connect.timeout", "60000").toInt val fc = new FileClient(handler, connectTimeout) try { diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala index 172ba6b01c..6d4f46125f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala @@ -97,7 +97,7 @@ private[spark] object CheckpointRDD extends Logging { throw new IOException("Checkpoint failed: temporary path " + tempOutputPath + " already exists") } - val bufferSize = env.conf.getOrElse("spark.buffer.size", "65536").toInt + val bufferSize = env.conf.get("spark.buffer.size", "65536").toInt val fileOutputStream = if (blockSize < 0) { fs.create(tempOutputPath, false, bufferSize) @@ -131,7 +131,7 @@ private[spark] object CheckpointRDD extends Logging { ): Iterator[T] = { val env = SparkEnv.get val fs = path.getFileSystem(broadcastedConf.value.value) - val bufferSize = env.conf.getOrElse("spark.buffer.size", "65536").toInt + val bufferSize = env.conf.get("spark.buffer.size", "65536").toInt val fileInputStream = fs.open(path, bufferSize) val serializer = env.serializer.newInstance() val deserializeStream = serializer.deserializeStream(fileInputStream) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index 29b0247f8a..e22b1e53e8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -31,7 +31,7 @@ import org.apache.spark.util.Utils private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedulerImpl) extends Logging { - private val THREADS = sparkEnv.conf.getOrElse("spark.resultGetter.threads", "4").toInt + private val THREADS = sparkEnv.conf.get("spark.resultGetter.threads", "4").toInt private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool( THREADS, "Result resolver thread") diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index bffd990e16..d94b706854 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -51,15 +51,15 @@ private[spark] class TaskSchedulerImpl( isLocal: Boolean = false) extends TaskScheduler with Logging { - def this(sc: SparkContext) = this(sc, sc.conf.getOrElse("spark.task.maxFailures", "4").toInt) + def this(sc: SparkContext) = this(sc, sc.conf.get("spark.task.maxFailures", "4").toInt) val conf = sc.conf // How often to check for speculative tasks - val SPECULATION_INTERVAL = conf.getOrElse("spark.speculation.interval", "100").toLong + val SPECULATION_INTERVAL = conf.get("spark.speculation.interval", "100").toLong // Threshold above which we warn user initial TaskSet may be starved - val STARVATION_TIMEOUT = conf.getOrElse("spark.starvation.timeout", "15000").toLong + val STARVATION_TIMEOUT = conf.get("spark.starvation.timeout", "15000").toLong // TaskSetManagers are not thread safe, so any access to one should be synchronized // on this class. @@ -96,7 +96,7 @@ private[spark] class TaskSchedulerImpl( var rootPool: Pool = null // default scheduler is FIFO val schedulingMode: SchedulingMode = SchedulingMode.withName( - conf.getOrElse("spark.scheduler.mode", "FIFO")) + conf.get("spark.scheduler.mode", "FIFO")) // This is a var so that we can reset it for testing purposes. private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this) @@ -125,7 +125,7 @@ private[spark] class TaskSchedulerImpl( override def start() { backend.start() - if (!isLocal && conf.getOrElse("spark.speculation", "false").toBoolean) { + if (!isLocal && conf.get("spark.speculation", "false").toBoolean) { logInfo("Starting speculative execution thread") import sc.env.actorSystem.dispatcher sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds, diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index b99664ae00..67ad99a4d7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -57,11 +57,11 @@ private[spark] class TaskSetManager( val conf = sched.sc.conf // CPUs to request per task - val CPUS_PER_TASK = conf.getOrElse("spark.task.cpus", "1").toInt + val CPUS_PER_TASK = conf.get("spark.task.cpus", "1").toInt // Quantile of tasks at which to start speculation - val SPECULATION_QUANTILE = conf.getOrElse("spark.speculation.quantile", "0.75").toDouble - val SPECULATION_MULTIPLIER = conf.getOrElse("spark.speculation.multiplier", "1.5").toDouble + val SPECULATION_QUANTILE = conf.get("spark.speculation.quantile", "0.75").toDouble + val SPECULATION_MULTIPLIER = conf.get("spark.speculation.multiplier", "1.5").toDouble // Serializer for closures and tasks. val env = SparkEnv.get @@ -116,7 +116,7 @@ private[spark] class TaskSetManager( // How frequently to reprint duplicate exceptions in full, in milliseconds val EXCEPTION_PRINT_INTERVAL = - conf.getOrElse("spark.logging.exceptionPrintInterval", "10000").toLong + conf.get("spark.logging.exceptionPrintInterval", "10000").toLong // Map of recent exceptions (identified by string representation and top stack frame) to // duplicate count (how many times the same exception has appeared) and time the full exception @@ -678,14 +678,14 @@ private[spark] class TaskSetManager( } private def getLocalityWait(level: TaskLocality.TaskLocality): Long = { - val defaultWait = conf.getOrElse("spark.locality.wait", "3000") + val defaultWait = conf.get("spark.locality.wait", "3000") level match { case TaskLocality.PROCESS_LOCAL => - conf.getOrElse("spark.locality.wait.process", defaultWait).toLong + conf.get("spark.locality.wait.process", defaultWait).toLong case TaskLocality.NODE_LOCAL => - conf.getOrElse("spark.locality.wait.node", defaultWait).toLong + conf.get("spark.locality.wait.node", defaultWait).toLong case TaskLocality.RACK_LOCAL => - conf.getOrElse("spark.locality.wait.rack", defaultWait).toLong + conf.get("spark.locality.wait.rack", defaultWait).toLong case TaskLocality.ANY => 0L } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index b4a3ecca39..2f5bcafe40 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.concurrent.Await import scala.concurrent.duration._ -import scala.util.Try import akka.actor._ import akka.pattern.ask @@ -64,7 +63,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) // Periodically revive offers to allow delay scheduling to work - val reviveInterval = conf.getOrElse("spark.scheduler.revive.interval", "1000").toLong + val reviveInterval = conf.get("spark.scheduler.revive.interval", "1000").toLong import context.dispatcher context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers) } @@ -209,8 +208,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A driverActor ! KillTask(taskId, executorId) } - override def defaultParallelism() = Try(conf.get("spark.default.parallelism")).toOption - .map(_.toInt).getOrElse(math.max(totalCoreCount.get(), 2)) + override def defaultParallelism(): Int = { + conf.getOption("spark.default.parallelism").map(_.toInt).getOrElse( + math.max(totalCoreCount.get(), 2)) + } // Called by subclasses when notified of a lost worker def removeExecutor(executorId: String, reason: String) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala index f41fbbd1f3..b44d1e43c8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -33,7 +33,7 @@ private[spark] class SimrSchedulerBackend( val tmpPath = new Path(driverFilePath + "_tmp") val filePath = new Path(driverFilePath) - val maxCores = conf.getOrElse("spark.simr.executor.cores", "1").toInt + val maxCores = conf.get("spark.simr.executor.cores", "1").toInt override def start() { super.start() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 224077566d..9858717d13 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -38,7 +38,7 @@ private[spark] class SparkDeploySchedulerBackend( var stopping = false var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _ - val maxCores = conf.getOrElse("spark.cores.max", Int.MaxValue.toString).toInt + val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt override def start() { super.start() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 9e2cd3f699..d247fa4244 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -62,7 +62,7 @@ private[spark] class CoarseMesosSchedulerBackend( var driver: SchedulerDriver = null // Maximum number of cores to acquire (TODO: we'll need more flexible controls here) - val maxCores = conf.getOrElse("spark.cores.max", Int.MaxValue.toString).toInt + val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt // Cores we have acquired with each Mesos task ID val coresByTaskId = new HashMap[Int, Int] @@ -77,7 +77,7 @@ private[spark] class CoarseMesosSchedulerBackend( "Spark home is not set; set it through the spark.home system " + "property, the SPARK_HOME environment variable or the SparkContext constructor")) - val extraCoresPerSlave = conf.getOrElse("spark.mesos.extra.cores", "0").toInt + val extraCoresPerSlave = conf.get("spark.mesos.extra.cores", "0").toInt var nextMesosTaskId = 0 diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index be96382983..c20fc418e8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -340,5 +340,5 @@ private[spark] class MesosSchedulerBackend( } // TODO: query Mesos for number of cores - override def defaultParallelism() = sc.conf.getOrElse("spark.default.parallelism", "8").toInt + override def defaultParallelism() = sc.conf.get("spark.default.parallelism", "8").toInt } diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 2367f3f521..a24a3b04b8 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -29,17 +29,14 @@ import org.apache.spark._ import org.apache.spark.broadcast.HttpBroadcast import org.apache.spark.scheduler.MapStatus import org.apache.spark.storage._ -import scala.util.Try -import org.apache.spark.storage.PutBlock -import org.apache.spark.storage.GetBlock -import org.apache.spark.storage.GotBlock +import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock} /** * A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]]. */ class KryoSerializer(conf: SparkConf) extends org.apache.spark.serializer.Serializer with Logging { private val bufferSize = { - conf.getOrElse("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024 + conf.get("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024 } def newKryoOutput() = new KryoOutput(bufferSize) @@ -51,7 +48,7 @@ class KryoSerializer(conf: SparkConf) extends org.apache.spark.serializer.Serial // Allow disabling Kryo reference tracking if user knows their object graphs don't have loops. // Do this before we invoke the user registrator so the user registrator can override this. - kryo.setReferences(conf.getOrElse("spark.kryo.referenceTracking", "true").toBoolean) + kryo.setReferences(conf.get("spark.kryo.referenceTracking", "true").toBoolean) for (cls <- KryoSerializer.toRegister) kryo.register(cls) @@ -61,13 +58,13 @@ class KryoSerializer(conf: SparkConf) extends org.apache.spark.serializer.Serial // Allow the user to register their own classes by setting spark.kryo.registrator try { - Try(conf.get("spark.kryo.registrator")).toOption.foreach { regCls => + for (regCls <- conf.getOption("spark.kryo.registrator")) { logDebug("Running user registrator: " + regCls) val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator] reg.registerClasses(kryo) } } catch { - case _: Exception => println("Failed to register spark.kryo.registrator") + case e: Exception => logError("Failed to run spark.kryo.registrator", e) } // Register Chill's classes; we do this after our ranges and the user's own classes to let diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala index 3b25f68ca8..47478631a1 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala @@ -327,7 +327,7 @@ object BlockFetcherIterator { fetchRequestsSync.put(request) } - copiers = startCopiers(conf.getOrElse("spark.shuffle.copier.threads", "6").toInt) + copiers = startCopiers(conf.get("spark.shuffle.copier.threads", "6").toInt) logInfo("Started " + fetchRequestsSync.size + " remote gets in " + Utils.getUsedTimeMs(startTime)) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 16ee208617..6d2cda97b0 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -49,7 +49,7 @@ private[spark] class BlockManager( val shuffleBlockManager = new ShuffleBlockManager(this) val diskBlockManager = new DiskBlockManager(shuffleBlockManager, - conf.getOrElse("spark.local.dir", System.getProperty("java.io.tmpdir"))) + conf.get("spark.local.dir", System.getProperty("java.io.tmpdir"))) private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo] @@ -58,8 +58,8 @@ private[spark] class BlockManager( // If we use Netty for shuffle, start a new Netty-based shuffle sender service. private val nettyPort: Int = { - val useNetty = conf.getOrElse("spark.shuffle.use.netty", "false").toBoolean - val nettyPortConfig = conf.getOrElse("spark.shuffle.sender.port", "0").toInt + val useNetty = conf.get("spark.shuffle.use.netty", "false").toBoolean + val nettyPortConfig = conf.get("spark.shuffle.sender.port", "0").toInt if (useNetty) diskBlockManager.startShuffleBlockSender(nettyPortConfig) else 0 } @@ -72,14 +72,14 @@ private[spark] class BlockManager( // Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory // for receiving shuffle outputs) val maxBytesInFlight = - conf.getOrElse("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024 + conf.get("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024 // Whether to compress broadcast variables that are stored - val compressBroadcast = conf.getOrElse("spark.broadcast.compress", "true").toBoolean + val compressBroadcast = conf.get("spark.broadcast.compress", "true").toBoolean // Whether to compress shuffle output that are stored - val compressShuffle = conf.getOrElse("spark.shuffle.compress", "true").toBoolean + val compressShuffle = conf.get("spark.shuffle.compress", "true").toBoolean // Whether to compress RDD partitions that are stored serialized - val compressRdds = conf.getOrElse("spark.rdd.compress", "false").toBoolean + val compressRdds = conf.get("spark.rdd.compress", "false").toBoolean val heartBeatFrequency = BlockManager.getHeartBeatFrequency(conf) @@ -443,7 +443,7 @@ private[spark] class BlockManager( : BlockFetcherIterator = { val iter = - if (conf.getOrElse("spark.shuffle.use.netty", "false").toBoolean) { + if (conf.get("spark.shuffle.use.netty", "false").toBoolean) { new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer) } else { new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer) @@ -469,7 +469,7 @@ private[spark] class BlockManager( def getDiskWriter(blockId: BlockId, file: File, serializer: Serializer, bufferSize: Int) : BlockObjectWriter = { val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _) - val syncWrites = conf.getOrElse("spark.shuffle.sync", "false").toBoolean + val syncWrites = conf.get("spark.shuffle.sync", "false").toBoolean new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream, syncWrites) } @@ -864,15 +864,15 @@ private[spark] object BlockManager extends Logging { val ID_GENERATOR = new IdGenerator def getMaxMemory(conf: SparkConf): Long = { - val memoryFraction = conf.getOrElse("spark.storage.memoryFraction", "0.66").toDouble + val memoryFraction = conf.get("spark.storage.memoryFraction", "0.66").toDouble (Runtime.getRuntime.maxMemory * memoryFraction).toLong } def getHeartBeatFrequency(conf: SparkConf): Long = - conf.getOrElse("spark.storage.blockManagerTimeoutIntervalMs", "60000").toLong / 4 + conf.get("spark.storage.blockManagerTimeoutIntervalMs", "60000").toLong / 4 def getDisableHeartBeatsForTesting(conf: SparkConf): Boolean = - conf.getOrElse("spark.test.disableBlockManagerHeartBeat", "false").toBoolean + conf.get("spark.test.disableBlockManagerHeartBeat", "false").toBoolean /** * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 8e4a88b20a..b5afe8cd23 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -31,8 +31,8 @@ private[spark] class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection], conf: SparkConf) extends Logging { - val AKKA_RETRY_ATTEMPTS: Int = conf.getOrElse("spark.akka.num.retries", "3").toInt - val AKKA_RETRY_INTERVAL_MS: Int = conf.getOrElse("spark.akka.retry.wait", "3000").toInt + val AKKA_RETRY_ATTEMPTS: Int = conf.get("spark.akka.num.retries", "3").toInt + val AKKA_RETRY_INTERVAL_MS: Int = conf.get("spark.akka.retry.wait", "3000").toInt val DRIVER_AKKA_ACTOR_NAME = "BlockManagerMaster" diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index dbbeeb39eb..58452d9657 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -50,10 +50,10 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act private val akkaTimeout = AkkaUtils.askTimeout(conf) - val slaveTimeout = conf.getOrElse("spark.storage.blockManagerSlaveTimeoutMs", + val slaveTimeout = conf.get("spark.storage.blockManagerSlaveTimeoutMs", "" + (BlockManager.getHeartBeatFrequency(conf) * 3)).toLong - val checkTimeoutInterval = conf.getOrElse("spark.storage.blockManagerTimeoutIntervalMs", + val checkTimeoutInterval = conf.get("spark.storage.blockManagerTimeoutIntervalMs", "60000").toLong var timeoutCheckingTask: Cancellable = null diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 7697092e1b..55dcb3742c 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -38,7 +38,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD extends PathResolver with Logging { private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 - private val subDirsPerLocalDir = shuffleManager.conf.getOrElse("spark.diskStore.subDirectories", "64").toInt + private val subDirsPerLocalDir = shuffleManager.conf.get("spark.diskStore.subDirectories", "64").toInt // Create one local directory for each path mentioned in spark.local.dir; then, inside this // directory, create multiple subdirectories that we will hash files into, in order to avoid diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index 151eedb783..39dc7bb19a 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -27,8 +27,6 @@ import org.apache.spark.serializer.Serializer import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap} import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector} import org.apache.spark.storage.ShuffleBlockManager.ShuffleFileGroup -import scala.util.Try -import org.apache.spark.SparkConf /** A group of writers for a ShuffleMapTask, one writer per reducer. */ private[spark] trait ShuffleWriterGroup { @@ -66,9 +64,9 @@ class ShuffleBlockManager(blockManager: BlockManager) { // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file. // TODO: Remove this once the shuffle file consolidation feature is stable. val consolidateShuffleFiles = - conf.getOrElse("spark.shuffle.consolidateFiles", "false").toBoolean + conf.get("spark.shuffle.consolidateFiles", "false").toBoolean - private val bufferSize = conf.getOrElse("spark.shuffle.file.buffer.kb", "100").toInt * 1024 + private val bufferSize = conf.get("spark.shuffle.file.buffer.kb", "100").toInt * 1024 /** * Contains all the state related to a particular shuffle. This includes a pool of unused diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 0ce8d9c8c4..50dfdbdf5a 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -32,7 +32,7 @@ import org.apache.spark.util.Utils /** Top level user interface for Spark */ private[spark] class SparkUI(sc: SparkContext) extends Logging { val host = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(Utils.localHostName()) - val port = sc.conf.getOrElse("spark.ui.port", SparkUI.DEFAULT_PORT).toInt + val port = sc.conf.get("spark.ui.port", SparkUI.DEFAULT_PORT).toInt var boundPort: Option[Int] = None var server: Option[Server] = None diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala index 6b4602f928..88f41be8d3 100644 --- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala @@ -66,7 +66,7 @@ private[spark] class EnvironmentUI(sc: SparkContext) { UIUtils.listingTable(propertyHeaders, propertyRow, otherProperties, fixedWidth = true) val classPathEntries = classPathProperty._2 - .split(sc.conf.getOrElse("path.separator", ":")) + .split(sc.conf.get("path.separator", ":")) .filterNot(e => e.isEmpty) .map(e => (e, "System Classpath")) val addedJars = sc.addedJars.iterator.toSeq.map{case (path, time) => (path, "Added By User")} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 315014d27d..b7b87250b9 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -33,7 +33,7 @@ import org.apache.spark.scheduler._ */ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkListener { // How many stages to remember - val RETAINED_STAGES = sc.conf.getOrElse("spark.ui.retained_stages", "1000").toInt + val RETAINED_STAGES = sc.conf.get("spark.ui.retained_stages", "1000").toInt val DEFAULT_POOL_NAME = "default" val stageIdToPool = new HashMap[Int, String]() diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 58b26f7f12..362cea5e3e 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -41,19 +41,19 @@ private[spark] object AkkaUtils { def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false, conf: SparkConf): (ActorSystem, Int) = { - val akkaThreads = conf.getOrElse("spark.akka.threads", "4").toInt - val akkaBatchSize = conf.getOrElse("spark.akka.batchSize", "15").toInt + val akkaThreads = conf.get("spark.akka.threads", "4").toInt + val akkaBatchSize = conf.get("spark.akka.batchSize", "15").toInt - val akkaTimeout = conf.getOrElse("spark.akka.timeout", "100").toInt + val akkaTimeout = conf.get("spark.akka.timeout", "100").toInt - val akkaFrameSize = conf.getOrElse("spark.akka.frameSize", "10").toInt + val akkaFrameSize = conf.get("spark.akka.frameSize", "10").toInt val lifecycleEvents = - if (conf.getOrElse("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off" + if (conf.get("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off" - val akkaHeartBeatPauses = conf.getOrElse("spark.akka.heartbeat.pauses", "600").toInt + val akkaHeartBeatPauses = conf.get("spark.akka.heartbeat.pauses", "600").toInt val akkaFailureDetector = - conf.getOrElse("spark.akka.failure-detector.threshold", "300.0").toDouble - val akkaHeartBeatInterval = conf.getOrElse("spark.akka.heartbeat.interval", "1000").toInt + conf.get("spark.akka.failure-detector.threshold", "300.0").toDouble + val akkaHeartBeatInterval = conf.get("spark.akka.heartbeat.interval", "1000").toInt val akkaConf = ConfigFactory.parseString( s""" @@ -89,6 +89,6 @@ private[spark] object AkkaUtils { /** Returns the default Spark timeout to use for Akka ask operations. */ def askTimeout(conf: SparkConf): FiniteDuration = { - Duration.create(conf.getOrElse("spark.akka.askTimeout", "30").toLong, "seconds") + Duration.create(conf.get("spark.akka.askTimeout", "30").toLong, "seconds") } } diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala index 9ea7fc2dfd..aa7f52cafb 100644 --- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala @@ -74,12 +74,12 @@ object MetadataCleanerType extends Enumeration { // initialization of StreamingContext. It's okay for users trying to configure stuff themselves. object MetadataCleaner { def getDelaySeconds(conf: SparkConf) = { - conf.getOrElse("spark.cleaner.ttl", "3500").toInt + conf.get("spark.cleaner.ttl", "3500").toInt } def getDelaySeconds(conf: SparkConf, cleanerType: MetadataCleanerType.MetadataCleanerType): Int = { - conf.getOrElse(MetadataCleanerType.systemProperty(cleanerType), getDelaySeconds(conf).toString) + conf.get(MetadataCleanerType.systemProperty(cleanerType), getDelaySeconds(conf).toString) .toInt } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index ca3320b22b..5f1253100b 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -311,7 +311,7 @@ private[spark] object Utils extends Logging { * multiple paths. */ def getLocalDir(conf: SparkConf): String = { - conf.getOrElse("spark.local.dir", System.getProperty("java.io.tmpdir")).split(',')(0) + conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")).split(',')(0) } /** @@ -397,7 +397,7 @@ private[spark] object Utils extends Logging { } def localHostPort(conf: SparkConf): String = { - val retval = conf.getOrElse("spark.hostPort", null) + val retval = conf.get("spark.hostPort", null) if (retval == null) { logErrorWithStack("spark.hostPort not set but invoking localHostPort") return localHostName() diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 77c7b829b3..ef5936dd2f 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -74,7 +74,7 @@ class SparkConfSuite extends FunSuite with LocalSparkContext { assert(!conf.contains("k4"), "conf contained k4") assert(conf.get("k1") === "v4") intercept[Exception] { conf.get("k4") } - assert(conf.getOrElse("k4", "not found") === "not found") + assert(conf.get("k4", "not found") === "not found") assert(conf.getOption("k1") === Some("v4")) assert(conf.getOption("k4") === None) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 5d33e66253..1eec6726f4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -83,7 +83,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { private val conf = new SparkConf - val LOCALITY_WAIT = conf.getOrElse("spark.locality.wait", "3000").toLong + val LOCALITY_WAIT = conf.get("spark.locality.wait", "3000").toLong val MAX_TASK_FAILURES = 4 test("TaskSet with no preferences") { diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index f940448abd..af4b31d53c 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -27,8 +27,8 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} import scala.util.Try import akka.actor.{Props, ActorSelection, ActorSystem} -class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll { - private val testConf = new SparkConf +class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach { + private val testConf = new SparkConf(false) val rootDir0 = Files.createTempDir() rootDir0.deleteOnExit() val rootDir1 = Files.createTempDir() @@ -38,9 +38,7 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before // This suite focuses primarily on consolidation features, // so we coerce consolidation if not already enabled. - val consolidateProp = "spark.shuffle.consolidateFiles" - val oldConsolidate = Try(testConf.get(consolidateProp)).toOption - testConf.set(consolidateProp, "true") + testConf.set("spark.shuffle.consolidateFiles", "true") val shuffleBlockManager = new ShuffleBlockManager(null) { override def conf = testConf.clone @@ -50,10 +48,6 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before var diskBlockManager: DiskBlockManager = _ - override def afterAll() { - oldConsolidate.map(c => System.setProperty(consolidateProp, c)) - } - override def beforeEach() { diskBlockManager = new DiskBlockManager(shuffleBlockManager, rootDirs) shuffleBlockManager.idToSegmentMap.clear() -- cgit v1.2.3