diff options
34 files changed, 78 insertions, 79 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index fce8f2d48c..f91392b351 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -116,7 +116,7 @@ class SparkContext( throw new SparkException("An application must be set in your configuration") } - if (conf.get("spark.logConf", "false").toBoolean) { + if (conf.getBoolean("spark.logConf", false)) { logInfo("Spark configuration:\n" + conf.toDebugString) } @@ -1209,7 +1209,7 @@ object SparkContext { case mesosUrl @ MESOS_REGEX(_) => MesosNativeLibrary.load() val scheduler = new TaskSchedulerImpl(sc) - val coarseGrained = sc.conf.get("spark.mesos.coarse", "false").toBoolean + val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", false) val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs val backend = if (coarseGrained) { new CoarseMesosSchedulerBackend(scheduler, sc, url, appName) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 2e36ccb9a0..e093e2f162 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -162,7 +162,7 @@ object SparkEnv extends Logging { actorSystem.actorOf(Props(newActor), name = name) } else { val driverHost: String = conf.get("spark.driver.host", "localhost") - val driverPort: Int = conf.get("spark.driver.port", "7077").toInt + val driverPort: Int = conf.getInt("spark.driver.port", 7077) Utils.checkHost(driverHost, "Expected hostname") val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name" val timeout = AkkaUtils.lookupTimeout(conf) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 32cc70e8c9..40c519b5bd 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -41,7 +41,7 @@ private[spark] class PythonRDD[T: ClassTag]( accumulator: Accumulator[JList[Array[Byte]]]) extends RDD[Array[Byte]](parent) { - val bufferSize = conf.get("spark.buffer.size", "65536").toInt + val bufferSize = conf.getInt("spark.buffer.size", 65536) override def getPartitions = parent.partitions @@ -250,7 +250,7 @@ private class PythonAccumulatorParam(@transient serverHost: String, serverPort: Utils.checkHost(serverHost, "Expected hostname") - val bufferSize = SparkEnv.get.conf.get("spark.buffer.size", "65536").toInt + val bufferSize = SparkEnv.get.conf.getInt("spark.buffer.size", 65536) override def zero(value: JList[Array[Byte]]): JList[Array[Byte]] = new JArrayList diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index db596d5fcc..0eacda3d7d 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -92,8 +92,8 @@ private object HttpBroadcast extends Logging { def initialize(isDriver: Boolean, conf: SparkConf) { synchronized { if (!initialized) { - bufferSize = conf.get("spark.buffer.size", "65536").toInt - compress = conf.get("spark.broadcast.compress", "true").toBoolean + bufferSize = conf.getInt("spark.buffer.size", 65536) + compress = conf.getBoolean("spark.broadcast.compress", true) if (isDriver) { createServer(conf) conf.set("spark.httpBroadcast.uri", serverUri) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 9530938278..fdf92eca4f 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -180,7 +180,7 @@ extends Logging { initialized = false } - lazy val BLOCK_SIZE = conf.get("spark.broadcast.blockSize", "4096").toInt * 1024 + lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) * 1024 def blockifyObject[T](obj: T): TorrentInfo = { val byteArray = Utils.serialize[T](obj) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 6617b7100f..066d1107d4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -43,9 +43,9 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act val conf = new SparkConf val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs - val WORKER_TIMEOUT = conf.get("spark.worker.timeout", "60").toLong * 1000 - val RETAINED_APPLICATIONS = conf.get("spark.deploy.retainedApplications", "200").toInt - val REAPER_ITERATIONS = conf.get("spark.dead.worker.persistence", "15").toInt + val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000 + val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200) + val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15) val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "") val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE") diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index fcaf4e92b1..0538e521f5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -55,7 +55,7 @@ private[spark] class Worker( val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs // Send a heartbeat every (heartbeat timeout) / 4 milliseconds - val HEARTBEAT_MILLIS = conf.get("spark.worker.timeout", "60").toLong * 1000 / 4 + val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4 val REGISTRATION_TIMEOUT = 20.seconds val REGISTRATION_RETRIES = 3 diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index a1e98845f6..5980177320 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -71,7 +71,7 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec { class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec { override def compressedOutputStream(s: OutputStream): OutputStream = { - val blockSize = conf.get("spark.io.compression.snappy.block.size", "32768").toInt + val blockSize = conf.getInt("spark.io.compression.snappy.block.size", 32768) new SnappyOutputStream(s, blockSize) } diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index 46c40d0a2a..e6e01783c8 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -54,22 +54,22 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi private val selector = SelectorProvider.provider.openSelector() private val handleMessageExecutor = new ThreadPoolExecutor( - conf.get("spark.core.connection.handler.threads.min", "20").toInt, - conf.get("spark.core.connection.handler.threads.max", "60").toInt, - conf.get("spark.core.connection.handler.threads.keepalive", "60").toInt, TimeUnit.SECONDS, + conf.getInt("spark.core.connection.handler.threads.min", 20), + conf.getInt("spark.core.connection.handler.threads.max", 60), + conf.getInt("spark.core.connection.handler.threads.keepalive", 60), TimeUnit.SECONDS, new LinkedBlockingDeque[Runnable]()) private val handleReadWriteExecutor = new ThreadPoolExecutor( - conf.get("spark.core.connection.io.threads.min", "4").toInt, - conf.get("spark.core.connection.io.threads.max", "32").toInt, - conf.get("spark.core.connection.io.threads.keepalive", "60").toInt, TimeUnit.SECONDS, + conf.getInt("spark.core.connection.io.threads.min", 4), + conf.getInt("spark.core.connection.io.threads.max", 32), + conf.getInt("spark.core.connection.io.threads.keepalive", 60), TimeUnit.SECONDS, new LinkedBlockingDeque[Runnable]()) // Use a different, yet smaller, thread pool - infrequently used with very short lived tasks : which should be executed asap private val handleConnectExecutor = new ThreadPoolExecutor( - conf.get("spark.core.connection.connect.threads.min", "1").toInt, - conf.get("spark.core.connection.connect.threads.max", "8").toInt, - conf.get("spark.core.connection.connect.threads.keepalive", "60").toInt, TimeUnit.SECONDS, + conf.getInt("spark.core.connection.connect.threads.min", 1), + conf.getInt("spark.core.connection.connect.threads.max", 8), + conf.getInt("spark.core.connection.connect.threads.keepalive", 60), TimeUnit.SECONDS, new LinkedBlockingDeque[Runnable]()) private val serverChannel = ServerSocketChannel.open() diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala index b729eb11c5..d87157e12c 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala @@ -36,7 +36,7 @@ private[spark] class ShuffleCopier(conf: SparkConf) extends Logging { resultCollectCallback: (BlockId, Long, ByteBuf) => Unit) { val handler = new ShuffleCopier.ShuffleClientHandler(resultCollectCallback) - val connectTimeout = conf.get("spark.shuffle.netty.connect.timeout", "60000").toInt + val connectTimeout = conf.getInt("spark.shuffle.netty.connect.timeout", 60000) val fc = new FileClient(handler, connectTimeout) try { diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala index 6d4f46125f..83109d1a6f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala @@ -97,7 +97,7 @@ private[spark] object CheckpointRDD extends Logging { throw new IOException("Checkpoint failed: temporary path " + tempOutputPath + " already exists") } - val bufferSize = env.conf.get("spark.buffer.size", "65536").toInt + val bufferSize = env.conf.getInt("spark.buffer.size", 65536) val fileOutputStream = if (blockSize < 0) { fs.create(tempOutputPath, false, bufferSize) @@ -131,7 +131,7 @@ private[spark] object CheckpointRDD extends Logging { ): Iterator[T] = { val env = SparkEnv.get val fs = path.getFileSystem(broadcastedConf.value.value) - val bufferSize = env.conf.get("spark.buffer.size", "65536").toInt + val bufferSize = env.conf.getInt("spark.buffer.size", 65536) val fileInputStream = fs.open(path, bufferSize) val serializer = env.serializer.newInstance() val deserializeStream = serializer.deserializeStream(fileInputStream) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index e22b1e53e8..c52d6175d2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -31,7 +31,7 @@ import org.apache.spark.util.Utils private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedulerImpl) extends Logging { - private val THREADS = sparkEnv.conf.get("spark.resultGetter.threads", "4").toInt + private val THREADS = sparkEnv.conf.getInt("spark.resultGetter.threads", 4) private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool( THREADS, "Result resolver thread") diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 0c8ed62759..d4f74d3e18 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -51,15 +51,15 @@ private[spark] class TaskSchedulerImpl( isLocal: Boolean = false) extends TaskScheduler with Logging { - def this(sc: SparkContext) = this(sc, sc.conf.get("spark.task.maxFailures", "4").toInt) + def this(sc: SparkContext) = this(sc, sc.conf.getInt("spark.task.maxFailures", 4)) val conf = sc.conf // How often to check for speculative tasks - val SPECULATION_INTERVAL = conf.get("spark.speculation.interval", "100").toLong + val SPECULATION_INTERVAL = conf.getLong("spark.speculation.interval", 100) // Threshold above which we warn user initial TaskSet may be starved - val STARVATION_TIMEOUT = conf.get("spark.starvation.timeout", "15000").toLong + val STARVATION_TIMEOUT = conf.getLong("spark.starvation.timeout", 15000) // TaskSetManagers are not thread safe, so any access to one should be synchronized // on this class. @@ -125,7 +125,7 @@ private[spark] class TaskSchedulerImpl( override def start() { backend.start() - if (!isLocal && conf.get("spark.speculation", "false").toBoolean) { + if (!isLocal && conf.getBoolean("spark.speculation", false)) { logInfo("Starting speculative execution thread") import sc.env.actorSystem.dispatcher sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds, diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 6dd1469d8f..a10e5397ad 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -57,11 +57,11 @@ private[spark] class TaskSetManager( val conf = sched.sc.conf // CPUs to request per task - val CPUS_PER_TASK = conf.get("spark.task.cpus", "1").toInt + val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1) // Quantile of tasks at which to start speculation - val SPECULATION_QUANTILE = conf.get("spark.speculation.quantile", "0.75").toDouble - val SPECULATION_MULTIPLIER = conf.get("spark.speculation.multiplier", "1.5").toDouble + val SPECULATION_QUANTILE = conf.getDouble("spark.speculation.quantile", 0.75) + val SPECULATION_MULTIPLIER = conf.getDouble("spark.speculation.multiplier", 1.5) // Serializer for closures and tasks. val env = SparkEnv.get @@ -116,7 +116,7 @@ private[spark] class TaskSetManager( // How frequently to reprint duplicate exceptions in full, in milliseconds val EXCEPTION_PRINT_INTERVAL = - conf.get("spark.logging.exceptionPrintInterval", "10000").toLong + conf.getLong("spark.logging.exceptionPrintInterval", 10000) // Map of recent exceptions (identified by string representation and top stack frame) to // duplicate count (how many times the same exception has appeared) and time the full exception diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 2f5bcafe40..8d596a76c2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -63,7 +63,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) // Periodically revive offers to allow delay scheduling to work - val reviveInterval = conf.get("spark.scheduler.revive.interval", "1000").toLong + val reviveInterval = conf.getLong("spark.scheduler.revive.interval", 1000) import context.dispatcher context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers) } @@ -209,8 +209,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A } override def defaultParallelism(): Int = { - conf.getOption("spark.default.parallelism").map(_.toInt).getOrElse( - math.max(totalCoreCount.get(), 2)) + conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2)) } // Called by subclasses when notified of a lost worker diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala index b44d1e43c8..d99c76117c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -33,7 +33,7 @@ private[spark] class SimrSchedulerBackend( val tmpPath = new Path(driverFilePath + "_tmp") val filePath = new Path(driverFilePath) - val maxCores = conf.get("spark.simr.executor.cores", "1").toInt + val maxCores = conf.getInt("spark.simr.executor.cores", 1) override def start() { super.start() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index d46fceba89..e16d60c54c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -77,7 +77,7 @@ private[spark] class CoarseMesosSchedulerBackend( "Spark home is not set; set it through the spark.home system " + "property, the SPARK_HOME environment variable or the SparkContext constructor")) - val extraCoresPerSlave = conf.get("spark.mesos.extra.cores", "0").toInt + val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0) var nextMesosTaskId = 0 diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index ae8d527352..b428c82a48 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -340,5 +340,5 @@ private[spark] class MesosSchedulerBackend( } // TODO: query Mesos for number of cores - override def defaultParallelism() = sc.conf.get("spark.default.parallelism", "8").toInt + override def defaultParallelism() = sc.conf.getInt("spark.default.parallelism", 8) } diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index a24a3b04b8..c14cd47556 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -36,7 +36,7 @@ import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock} */ class KryoSerializer(conf: SparkConf) extends org.apache.spark.serializer.Serializer with Logging { private val bufferSize = { - conf.get("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024 + conf.getInt("spark.kryoserializer.buffer.mb", 2) * 1024 * 1024 } def newKryoOutput() = new KryoOutput(bufferSize) @@ -48,7 +48,7 @@ class KryoSerializer(conf: SparkConf) extends org.apache.spark.serializer.Serial // Allow disabling Kryo reference tracking if user knows their object graphs don't have loops. // Do this before we invoke the user registrator so the user registrator can override this. - kryo.setReferences(conf.get("spark.kryo.referenceTracking", "true").toBoolean) + kryo.setReferences(conf.getBoolean("spark.kryo.referenceTracking", true)) for (cls <- KryoSerializer.toRegister) kryo.register(cls) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala index 47478631a1..4fa2ab96d9 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala @@ -327,7 +327,7 @@ object BlockFetcherIterator { fetchRequestsSync.put(request) } - copiers = startCopiers(conf.get("spark.shuffle.copier.threads", "6").toInt) + copiers = startCopiers(conf.getInt("spark.shuffle.copier.threads", 6)) logInfo("Started " + fetchRequestsSync.size + " remote gets in " + Utils.getUsedTimeMs(startTime)) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 6d2cda97b0..c56e2ca2df 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -58,8 +58,8 @@ private[spark] class BlockManager( // If we use Netty for shuffle, start a new Netty-based shuffle sender service. private val nettyPort: Int = { - val useNetty = conf.get("spark.shuffle.use.netty", "false").toBoolean - val nettyPortConfig = conf.get("spark.shuffle.sender.port", "0").toInt + val useNetty = conf.getBoolean("spark.shuffle.use.netty", false) + val nettyPortConfig = conf.getInt("spark.shuffle.sender.port", 0) if (useNetty) diskBlockManager.startShuffleBlockSender(nettyPortConfig) else 0 } @@ -72,14 +72,14 @@ private[spark] class BlockManager( // Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory // for receiving shuffle outputs) val maxBytesInFlight = - conf.get("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024 + conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024 // Whether to compress broadcast variables that are stored - val compressBroadcast = conf.get("spark.broadcast.compress", "true").toBoolean + val compressBroadcast = conf.getBoolean("spark.broadcast.compress", true) // Whether to compress shuffle output that are stored - val compressShuffle = conf.get("spark.shuffle.compress", "true").toBoolean + val compressShuffle = conf.getBoolean("spark.shuffle.compress", true) // Whether to compress RDD partitions that are stored serialized - val compressRdds = conf.get("spark.rdd.compress", "false").toBoolean + val compressRdds = conf.getBoolean("spark.rdd.compress", false) val heartBeatFrequency = BlockManager.getHeartBeatFrequency(conf) @@ -443,7 +443,7 @@ private[spark] class BlockManager( : BlockFetcherIterator = { val iter = - if (conf.get("spark.shuffle.use.netty", "false").toBoolean) { + if (conf.getBoolean("spark.shuffle.use.netty", false)) { new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer) } else { new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer) @@ -469,7 +469,7 @@ private[spark] class BlockManager( def getDiskWriter(blockId: BlockId, file: File, serializer: Serializer, bufferSize: Int) : BlockObjectWriter = { val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _) - val syncWrites = conf.get("spark.shuffle.sync", "false").toBoolean + val syncWrites = conf.getBoolean("spark.shuffle.sync", false) new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream, syncWrites) } @@ -864,15 +864,15 @@ private[spark] object BlockManager extends Logging { val ID_GENERATOR = new IdGenerator def getMaxMemory(conf: SparkConf): Long = { - val memoryFraction = conf.get("spark.storage.memoryFraction", "0.66").toDouble + val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.66) (Runtime.getRuntime.maxMemory * memoryFraction).toLong } def getHeartBeatFrequency(conf: SparkConf): Long = - conf.get("spark.storage.blockManagerTimeoutIntervalMs", "60000").toLong / 4 + conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000) / 4 def getDisableHeartBeatsForTesting(conf: SparkConf): Boolean = - conf.get("spark.test.disableBlockManagerHeartBeat", "false").toBoolean + conf.getBoolean("spark.test.disableBlockManagerHeartBeat", false) /** * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 51a29ed8ef..c54e4f2664 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -30,8 +30,8 @@ import org.apache.spark.util.AkkaUtils private[spark] class BlockManagerMaster(var driverActor : ActorRef, conf: SparkConf) extends Logging { - val AKKA_RETRY_ATTEMPTS: Int = conf.get("spark.akka.num.retries", "3").toInt - val AKKA_RETRY_INTERVAL_MS: Int = conf.get("spark.akka.retry.wait", "3000").toInt + val AKKA_RETRY_ATTEMPTS: Int = conf.getInt("spark.akka.num.retries", 3) + val AKKA_RETRY_INTERVAL_MS: Int = conf.getInt("spark.akka.retry.wait", 3000) val DRIVER_AKKA_ACTOR_NAME = "BlockManagerMaster" diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 55dcb3742c..edc1133172 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -38,7 +38,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD extends PathResolver with Logging { private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 - private val subDirsPerLocalDir = shuffleManager.conf.get("spark.diskStore.subDirectories", "64").toInt + private val subDirsPerLocalDir = shuffleManager.conf.getInt("spark.diskStore.subDirectories", 64) // Create one local directory for each path mentioned in spark.local.dir; then, inside this // directory, create multiple subdirectories that we will hash files into, in order to avoid diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index 39dc7bb19a..e2b24298a5 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -64,9 +64,9 @@ class ShuffleBlockManager(blockManager: BlockManager) { // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file. // TODO: Remove this once the shuffle file consolidation feature is stable. val consolidateShuffleFiles = - conf.get("spark.shuffle.consolidateFiles", "false").toBoolean + conf.getBoolean("spark.shuffle.consolidateFiles", false) - private val bufferSize = conf.get("spark.shuffle.file.buffer.kb", "100").toInt * 1024 + private val bufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024 /** * Contains all the state related to a particular shuffle. This includes a pool of unused diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index d6d9f0cedf..bcd2824450 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -33,7 +33,7 @@ import org.apache.spark.scheduler._ */ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkListener { // How many stages to remember - val RETAINED_STAGES = sc.conf.get("spark.ui.retainedStages", "1000").toInt + val RETAINED_STAGES = sc.conf.getInt("spark.ui.retainedStages", 1000) val DEFAULT_POOL_NAME = "default" val stageIdToPool = new HashMap[Int, String]() diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 3f009a8998..761d378c7f 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -44,13 +44,13 @@ private[spark] object AkkaUtils { def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false, conf: SparkConf): (ActorSystem, Int) = { - val akkaThreads = conf.get("spark.akka.threads", "4").toInt - val akkaBatchSize = conf.get("spark.akka.batchSize", "15").toInt + val akkaThreads = conf.getInt("spark.akka.threads", 4) + val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15) - val akkaTimeout = conf.get("spark.akka.timeout", "100").toInt + val akkaTimeout = conf.getInt("spark.akka.timeout", 100) - val akkaFrameSize = conf.get("spark.akka.frameSize", "10").toInt - val akkaLogLifecycleEvents = conf.get("spark.akka.logLifecycleEvents", "false").toBoolean + val akkaFrameSize = conf.getInt("spark.akka.frameSize", 10) + val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false) val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off" if (!akkaLogLifecycleEvents) { // As a workaround for Akka issue #3787, we coerce the "EndpointWriter" log to be silent. @@ -58,12 +58,12 @@ private[spark] object AkkaUtils { Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL)) } - val logAkkaConfig = if (conf.get("spark.akka.logAkkaConfig", "false").toBoolean) "on" else "off" + val logAkkaConfig = if (conf.getBoolean("spark.akka.logAkkaConfig", false)) "on" else "off" - val akkaHeartBeatPauses = conf.get("spark.akka.heartbeat.pauses", "600").toInt + val akkaHeartBeatPauses = conf.getInt("spark.akka.heartbeat.pauses", 600) val akkaFailureDetector = - conf.get("spark.akka.failure-detector.threshold", "300.0").toDouble - val akkaHeartBeatInterval = conf.get("spark.akka.heartbeat.interval", "1000").toInt + conf.getDouble("spark.akka.failure-detector.threshold", 300.0) + val akkaHeartBeatInterval = conf.getInt("spark.akka.heartbeat.interval", 1000) val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback( ConfigFactory.parseString( @@ -103,7 +103,7 @@ private[spark] object AkkaUtils { /** Returns the default Spark timeout to use for Akka ask operations. */ def askTimeout(conf: SparkConf): FiniteDuration = { - Duration.create(conf.get("spark.akka.askTimeout", "30").toLong, "seconds") + Duration.create(conf.getLong("spark.akka.askTimeout", 30), "seconds") } /** Returns the default Spark timeout to use for Akka remote actor lookup. */ diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala index aa7f52cafb..3d1e90a352 100644 --- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala @@ -74,7 +74,7 @@ object MetadataCleanerType extends Enumeration { // initialization of StreamingContext. It's okay for users trying to configure stuff themselves. object MetadataCleaner { def getDelaySeconds(conf: SparkConf) = { - conf.get("spark.cleaner.ttl", "3500").toInt + conf.getInt("spark.cleaner.ttl", 3500) } def getDelaySeconds(conf: SparkConf, cleanerType: MetadataCleanerType.MetadataCleanerType): Int = diff --git a/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala b/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala index e9907e6c85..08b31ac64f 100644 --- a/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala +++ b/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala @@ -91,4 +91,4 @@ private[spark] object XORShiftRandom { } -}
\ No newline at end of file +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 1eec6726f4..c9f6cc5d07 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -83,7 +83,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { private val conf = new SparkConf - val LOCALITY_WAIT = conf.get("spark.locality.wait", "3000").toLong + val LOCALITY_WAIT = conf.getLong("spark.locality.wait", 3000) val MAX_TASK_FAILURES = 4 test("TaskSet with no preferences") { diff --git a/core/src/test/scala/org/apache/spark/util/XORShiftRandomSuite.scala b/core/src/test/scala/org/apache/spark/util/XORShiftRandomSuite.scala index b78367b6ca..f1d7b61b31 100644 --- a/core/src/test/scala/org/apache/spark/util/XORShiftRandomSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/XORShiftRandomSuite.scala @@ -73,4 +73,4 @@ class XORShiftRandomSuite extends FunSuite with ShouldMatchers { } -}
\ No newline at end of file +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala index 27d474c0a0..d41f726f83 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala @@ -175,7 +175,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging private class NetworkReceiverActor extends Actor { logInfo("Attempting to register with tracker") val ip = env.conf.get("spark.driver.host", "localhost") - val port = env.conf.get("spark.driver.port", "7077").toInt + val port = env.conf.getInt("spark.driver.port", 7077) val url = "akka.tcp://spark@%s:%s/user/NetworkInputTracker".format(ip, port) val tracker = env.actorSystem.actorSelection(url) val timeout = 5.seconds @@ -212,7 +212,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging case class Block(id: BlockId, buffer: ArrayBuffer[T], metadata: Any = null) val clock = new SystemClock() - val blockInterval = env.conf.get("spark.streaming.blockInterval", "200").toLong + val blockInterval = env.conf.getLong("spark.streaming.blockInterval", 200) val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer) val blockStorageLevel = storageLevel val blocksForPushing = new ArrayBlockingQueue[Block](1000) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala index 7341bfbc99..c8ee93bf5b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala @@ -38,4 +38,4 @@ class Job(val time: Time, func: () => _) { } override def toString = id -}
\ No newline at end of file +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 5f8be93a98..3c624e8199 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -104,7 +104,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { // or if the property is defined set it to that time if (clock.isInstanceOf[ManualClock]) { val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds - val jumpTime = ssc.sc.conf.get("spark.streaming.manualClock.jump", "0").toLong + val jumpTime = ssc.sc.conf.getLong("spark.streaming.manualClock.jump", 0) clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 9304fc1a93..30c070c274 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -31,7 +31,7 @@ private[streaming] class JobScheduler(val ssc: StreamingContext) extends Logging { val jobSets = new ConcurrentHashMap[Time, JobSet] - val numConcurrentJobs = ssc.conf.get("spark.streaming.concurrentJobs", "1").toInt + val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) val executor = Executors.newFixedThreadPool(numConcurrentJobs) val generator = new JobGenerator(this) val listenerBus = new StreamingListenerBus() |