diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/storage/BlockManager.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 24 |
1 files changed, 12 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 16ee208617..6d2cda97b0 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -49,7 +49,7 @@ private[spark] class BlockManager( val shuffleBlockManager = new ShuffleBlockManager(this) val diskBlockManager = new DiskBlockManager(shuffleBlockManager, - conf.getOrElse("spark.local.dir", System.getProperty("java.io.tmpdir"))) + conf.get("spark.local.dir", System.getProperty("java.io.tmpdir"))) private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo] @@ -58,8 +58,8 @@ private[spark] class BlockManager( // If we use Netty for shuffle, start a new Netty-based shuffle sender service. private val nettyPort: Int = { - val useNetty = conf.getOrElse("spark.shuffle.use.netty", "false").toBoolean - val nettyPortConfig = conf.getOrElse("spark.shuffle.sender.port", "0").toInt + val useNetty = conf.get("spark.shuffle.use.netty", "false").toBoolean + val nettyPortConfig = conf.get("spark.shuffle.sender.port", "0").toInt if (useNetty) diskBlockManager.startShuffleBlockSender(nettyPortConfig) else 0 } @@ -72,14 +72,14 @@ private[spark] class BlockManager( // Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory // for receiving shuffle outputs) val maxBytesInFlight = - conf.getOrElse("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024 + conf.get("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024 // Whether to compress broadcast variables that are stored - val compressBroadcast = conf.getOrElse("spark.broadcast.compress", "true").toBoolean + val compressBroadcast = conf.get("spark.broadcast.compress", "true").toBoolean // Whether to compress shuffle output that are stored - val compressShuffle = conf.getOrElse("spark.shuffle.compress", "true").toBoolean + val compressShuffle = conf.get("spark.shuffle.compress", "true").toBoolean // Whether to compress RDD partitions that are stored serialized - val compressRdds = conf.getOrElse("spark.rdd.compress", "false").toBoolean + val compressRdds = conf.get("spark.rdd.compress", "false").toBoolean val heartBeatFrequency = BlockManager.getHeartBeatFrequency(conf) @@ -443,7 +443,7 @@ private[spark] class BlockManager( : BlockFetcherIterator = { val iter = - if (conf.getOrElse("spark.shuffle.use.netty", "false").toBoolean) { + if (conf.get("spark.shuffle.use.netty", "false").toBoolean) { new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer) } else { new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer) @@ -469,7 +469,7 @@ private[spark] class BlockManager( def getDiskWriter(blockId: BlockId, file: File, serializer: Serializer, bufferSize: Int) : BlockObjectWriter = { val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _) - val syncWrites = conf.getOrElse("spark.shuffle.sync", "false").toBoolean + val syncWrites = conf.get("spark.shuffle.sync", "false").toBoolean new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream, syncWrites) } @@ -864,15 +864,15 @@ private[spark] object BlockManager extends Logging { val ID_GENERATOR = new IdGenerator def getMaxMemory(conf: SparkConf): Long = { - val memoryFraction = conf.getOrElse("spark.storage.memoryFraction", "0.66").toDouble + val memoryFraction = conf.get("spark.storage.memoryFraction", "0.66").toDouble (Runtime.getRuntime.maxMemory * memoryFraction).toLong } def getHeartBeatFrequency(conf: SparkConf): Long = - conf.getOrElse("spark.storage.blockManagerTimeoutIntervalMs", "60000").toLong / 4 + conf.get("spark.storage.blockManagerTimeoutIntervalMs", "60000").toLong / 4 def getDisableHeartBeatsForTesting(conf: SparkConf): Boolean = - conf.getOrElse("spark.test.disableBlockManagerHeartBeat", "false").toBoolean + conf.get("spark.test.disableBlockManagerHeartBeat", "false").toBoolean /** * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that |