diff options
author | Josh Rosen <joshrosen@databricks.com> | 2015-09-24 14:18:33 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-09-24 14:18:33 -0700 |
commit | 8023242e77e4a799de6edc490078285684549b6d (patch) | |
tree | 430082a41458ba7cb810d448839b1d9d96d0f912 /core | |
parent | b3862d3c59746ffb5f089aea4ff9e6f033a2c658 (diff) | |
download | spark-8023242e77e4a799de6edc490078285684549b6d.tar.gz spark-8023242e77e4a799de6edc490078285684549b6d.tar.bz2 spark-8023242e77e4a799de6edc490078285684549b6d.zip |
[SPARK-10761] Refactor DiskBlockObjectWriter to not require BlockId
The DiskBlockObjectWriter constructor took a BlockId parameter but never used it. As part of some general cleanup in these interfaces, this patch refactors its constructor to eliminate this parameter.
Author: Josh Rosen <joshrosen@databricks.com>
Closes #8871 from JoshRosen/disk-block-object-writer-blockid-cleanup.
Diffstat (limited to 'core')
8 files changed, 27 insertions, 30 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index 0b8b604e18..f5d80bbcf3 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -151,7 +151,7 @@ final class BypassMergeSortShuffleWriter<K, V> implements SortShuffleFileWriter< } finally { Closeables.close(in, copyThrewException); } - if (!blockManager.diskBlockManager().getFile(partitionWriters[i].blockId()).delete()) { + if (!partitionWriters[i].fileSegment().file().delete()) { logger.error("Unable to delete file for partition {}", i); } } @@ -168,12 +168,11 @@ final class BypassMergeSortShuffleWriter<K, V> implements SortShuffleFileWriter< public void stop() throws IOException { if (partitionWriters != null) { try { - final DiskBlockManager diskBlockManager = blockManager.diskBlockManager(); for (DiskBlockObjectWriter writer : partitionWriters) { // This method explicitly does _not_ throw exceptions: - writer.revertPartialWritesAndClose(); - if (!diskBlockManager.getFile(writer.blockId()).delete()) { - logger.error("Error while deleting file for block {}", writer.blockId()); + File file = writer.revertPartialWritesAndClose(); + if (!file.delete()) { + logger.error("Error while deleting file {}", file.getAbsolutePath()); } } } finally { diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index 65887d119d..5e4c2b5d0a 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -119,9 +119,8 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB } private[spark] object IndexShuffleBlockResolver { - // No-op reduce ID used in interactions with disk store and DiskBlockObjectWriter. + // No-op reduce ID used in interactions with disk store. // The disk store currently expects puts to relate to a (map, reduce) pair, but in the sort // shuffle outputs for several reduces are glommed into a single file. - // TODO: Avoid this entirely by having the DiskBlockObjectWriter not require a BlockId. val NOOP_REDUCE_ID = 0 } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index bca3942f8c..47bd2ef8b2 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -669,7 +669,7 @@ private[spark] class BlockManager( writeMetrics: ShuffleWriteMetrics): DiskBlockObjectWriter = { val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _) val syncWrites = conf.getBoolean("spark.shuffle.sync", false) - new DiskBlockObjectWriter(blockId, file, serializerInstance, bufferSize, compressStream, + new DiskBlockObjectWriter(file, serializerInstance, bufferSize, compressStream, syncWrites, writeMetrics) } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala index 49d9154f95..80d426fadc 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala @@ -34,7 +34,6 @@ import org.apache.spark.util.Utils * reopened again. */ private[spark] class DiskBlockObjectWriter( - val blockId: BlockId, file: File, serializerInstance: SerializerInstance, bufferSize: Int, @@ -144,8 +143,10 @@ private[spark] class DiskBlockObjectWriter( * Reverts writes that haven't been flushed yet. Callers should invoke this function * when there are runtime exceptions. This method will not throw, though it may be * unsuccessful in truncating written data. + * + * @return the file that this DiskBlockObjectWriter wrote to. */ - def revertPartialWritesAndClose() { + def revertPartialWritesAndClose(): File = { // Discard current writes. We do this by flushing the outstanding writes and then // truncating the file to its initial position. try { @@ -160,12 +161,14 @@ private[spark] class DiskBlockObjectWriter( val truncateStream = new FileOutputStream(file, true) try { truncateStream.getChannel.truncate(initialPosition) + file } finally { truncateStream.close() } } catch { case e: Exception => logError("Uncaught exception while reverting partial writes to file " + file, e) + file } } diff --git a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java index a266b0c36e..d218344cd4 100644 --- a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java @@ -129,7 +129,6 @@ public class UnsafeShuffleWriterSuite { Object[] args = invocationOnMock.getArguments(); return new DiskBlockObjectWriter( - (BlockId) args[0], (File) args[1], (SerializerInstance) args[2], (Integer) args[3], diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java index 445a37b83e..a5bbaa95fa 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java @@ -127,7 +127,6 @@ public class UnsafeExternalSorterSuite { Object[] args = invocationOnMock.getArguments(); return new DiskBlockObjectWriter( - (BlockId) args[0], (File) args[1], (SerializerInstance) args[2], (Integer) args[3], diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala index cc7342f1ec..341f56df2d 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala @@ -72,7 +72,6 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte override def answer(invocation: InvocationOnMock): DiskBlockObjectWriter = { val args = invocation.getArguments new DiskBlockObjectWriter( - args(0).asInstanceOf[BlockId], args(1).asInstanceOf[File], args(2).asInstanceOf[SerializerInstance], args(3).asInstanceOf[Int], diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala index 66af6e1a79..7c19531c18 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala @@ -20,7 +20,6 @@ import java.io.File import org.scalatest.BeforeAndAfterEach -import org.apache.spark.SparkConf import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.serializer.JavaSerializer @@ -41,8 +40,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { test("verify write metrics") { val file = new File(tempDir, "somefile") val writeMetrics = new ShuffleWriteMetrics() - val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) + val writer = new DiskBlockObjectWriter( + file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) writer.write(Long.box(20), Long.box(30)) // Record metrics update on every write @@ -63,8 +62,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { test("verify write metrics on revert") { val file = new File(tempDir, "somefile") val writeMetrics = new ShuffleWriteMetrics() - val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) + val writer = new DiskBlockObjectWriter( + file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) writer.write(Long.box(20), Long.box(30)) // Record metrics update on every write @@ -86,8 +85,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { test("Reopening a closed block writer") { val file = new File(tempDir, "somefile") val writeMetrics = new ShuffleWriteMetrics() - val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) + val writer = new DiskBlockObjectWriter( + file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) writer.open() writer.close() @@ -99,8 +98,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { test("calling revertPartialWritesAndClose() on a closed block writer should have no effect") { val file = new File(tempDir, "somefile") val writeMetrics = new ShuffleWriteMetrics() - val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) + val writer = new DiskBlockObjectWriter( + file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) for (i <- 1 to 1000) { writer.write(i, i) } @@ -115,8 +114,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { test("commitAndClose() should be idempotent") { val file = new File(tempDir, "somefile") val writeMetrics = new ShuffleWriteMetrics() - val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) + val writer = new DiskBlockObjectWriter( + file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) for (i <- 1 to 1000) { writer.write(i, i) } @@ -133,8 +132,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { test("revertPartialWritesAndClose() should be idempotent") { val file = new File(tempDir, "somefile") val writeMetrics = new ShuffleWriteMetrics() - val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) + val writer = new DiskBlockObjectWriter( + file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) for (i <- 1 to 1000) { writer.write(i, i) } @@ -151,8 +150,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { test("fileSegment() can only be called after commitAndClose() has been called") { val file = new File(tempDir, "somefile") val writeMetrics = new ShuffleWriteMetrics() - val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) + val writer = new DiskBlockObjectWriter( + file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) for (i <- 1 to 1000) { writer.write(i, i) } @@ -165,8 +164,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { test("commitAndClose() without ever opening or writing") { val file = new File(tempDir, "somefile") val writeMetrics = new ShuffleWriteMetrics() - val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) + val writer = new DiskBlockObjectWriter( + file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) writer.commitAndClose() assert(writer.fileSegment().length === 0) } |