aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/storage/BlockManager.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala24
1 files changed, 12 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 16ee208617..6d2cda97b0 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -49,7 +49,7 @@ private[spark] class BlockManager(
val shuffleBlockManager = new ShuffleBlockManager(this)
val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
- conf.getOrElse("spark.local.dir", System.getProperty("java.io.tmpdir")))
+ conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
@@ -58,8 +58,8 @@ private[spark] class BlockManager(
// If we use Netty for shuffle, start a new Netty-based shuffle sender service.
private val nettyPort: Int = {
- val useNetty = conf.getOrElse("spark.shuffle.use.netty", "false").toBoolean
- val nettyPortConfig = conf.getOrElse("spark.shuffle.sender.port", "0").toInt
+ val useNetty = conf.get("spark.shuffle.use.netty", "false").toBoolean
+ val nettyPortConfig = conf.get("spark.shuffle.sender.port", "0").toInt
if (useNetty) diskBlockManager.startShuffleBlockSender(nettyPortConfig) else 0
}
@@ -72,14 +72,14 @@ private[spark] class BlockManager(
// Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory
// for receiving shuffle outputs)
val maxBytesInFlight =
- conf.getOrElse("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024
+ conf.get("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024
// Whether to compress broadcast variables that are stored
- val compressBroadcast = conf.getOrElse("spark.broadcast.compress", "true").toBoolean
+ val compressBroadcast = conf.get("spark.broadcast.compress", "true").toBoolean
// Whether to compress shuffle output that are stored
- val compressShuffle = conf.getOrElse("spark.shuffle.compress", "true").toBoolean
+ val compressShuffle = conf.get("spark.shuffle.compress", "true").toBoolean
// Whether to compress RDD partitions that are stored serialized
- val compressRdds = conf.getOrElse("spark.rdd.compress", "false").toBoolean
+ val compressRdds = conf.get("spark.rdd.compress", "false").toBoolean
val heartBeatFrequency = BlockManager.getHeartBeatFrequency(conf)
@@ -443,7 +443,7 @@ private[spark] class BlockManager(
: BlockFetcherIterator = {
val iter =
- if (conf.getOrElse("spark.shuffle.use.netty", "false").toBoolean) {
+ if (conf.get("spark.shuffle.use.netty", "false").toBoolean) {
new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer)
} else {
new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer)
@@ -469,7 +469,7 @@ private[spark] class BlockManager(
def getDiskWriter(blockId: BlockId, file: File, serializer: Serializer, bufferSize: Int)
: BlockObjectWriter = {
val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
- val syncWrites = conf.getOrElse("spark.shuffle.sync", "false").toBoolean
+ val syncWrites = conf.get("spark.shuffle.sync", "false").toBoolean
new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream, syncWrites)
}
@@ -864,15 +864,15 @@ private[spark] object BlockManager extends Logging {
val ID_GENERATOR = new IdGenerator
def getMaxMemory(conf: SparkConf): Long = {
- val memoryFraction = conf.getOrElse("spark.storage.memoryFraction", "0.66").toDouble
+ val memoryFraction = conf.get("spark.storage.memoryFraction", "0.66").toDouble
(Runtime.getRuntime.maxMemory * memoryFraction).toLong
}
def getHeartBeatFrequency(conf: SparkConf): Long =
- conf.getOrElse("spark.storage.blockManagerTimeoutIntervalMs", "60000").toLong / 4
+ conf.get("spark.storage.blockManagerTimeoutIntervalMs", "60000").toLong / 4
def getDisableHeartBeatsForTesting(conf: SparkConf): Boolean =
- conf.getOrElse("spark.test.disableBlockManagerHeartBeat", "false").toBoolean
+ conf.get("spark.test.disableBlockManagerHeartBeat", "false").toBoolean
/**
* Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that