From a9bcc980b693bf5b0959caccf74367fc70348041 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 20 Jan 2014 23:42:24 -0800 Subject: Style clean-up --- .../scala/org/apache/spark/storage/ShuffleBlockManager.scala | 9 ++++++--- .../apache/spark/util/collection/ExternalAppendOnlyMap.scala | 11 +++-------- 2 files changed, 9 insertions(+), 11 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index 173c3291b1..bb07c8cb13 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -109,9 +109,12 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { val blockFile = blockManager.diskBlockManager.getFile(blockId) // Because of previous failures, the shuffle file may already exist on this machine. // If so, remove it. - if (blockFile.exists()) { - val removed = blockFile.delete() - logInfo(s"Removed existing shuffle file $blockFile successfully: $removed") + if (blockFile.exists) { + if (blockFile.delete()) { + logInfo(s"Removed existing shuffle file $blockFile") + } else { + logWarning(s"Failed to remove existing shuffle file $blockFile") + } } blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize) } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 792f29de60..fb73636162 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -173,15 +173,10 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( * most likely require using the file channel API. */ - val codec = new LZFCompressionCodec(sparkConf) - + val shouldCompress = blockManager.shouldCompress(blockId) + val compressionCodec = new LZFCompressionCodec(sparkConf) def wrapForCompression(outputStream: OutputStream) = { - blockManager.shouldCompress(blockId) match { - case true => - codec.compressedOutputStream(outputStream) - case false => - outputStream - } + if (shouldCompress) compressionCodec.compressedOutputStream(outputStream) else outputStream } def getNewWriter = new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize, -- cgit v1.2.3