aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-09-24 14:18:33 -0700
committerReynold Xin <rxin@databricks.com>2015-09-24 14:18:33 -0700
commit8023242e77e4a799de6edc490078285684549b6d (patch)
tree430082a41458ba7cb810d448839b1d9d96d0f912 /core
parentb3862d3c59746ffb5f089aea4ff9e6f033a2c658 (diff)
downloadspark-8023242e77e4a799de6edc490078285684549b6d.tar.gz
spark-8023242e77e4a799de6edc490078285684549b6d.tar.bz2
spark-8023242e77e4a799de6edc490078285684549b6d.zip
[SPARK-10761] Refactor DiskBlockObjectWriter to not require BlockId
The DiskBlockObjectWriter constructor took a BlockId parameter but never used it. As part of some general cleanup in these interfaces, this patch refactors its constructor to eliminate this parameter. Author: Josh Rosen <joshrosen@databricks.com> Closes #8871 from JoshRosen/disk-block-object-writer-blockid-cleanup.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java9
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala7
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java1
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java1
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala33
8 files changed, 27 insertions, 30 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index 0b8b604e18..f5d80bbcf3 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -151,7 +151,7 @@ final class BypassMergeSortShuffleWriter<K, V> implements SortShuffleFileWriter<
} finally {
Closeables.close(in, copyThrewException);
}
- if (!blockManager.diskBlockManager().getFile(partitionWriters[i].blockId()).delete()) {
+ if (!partitionWriters[i].fileSegment().file().delete()) {
logger.error("Unable to delete file for partition {}", i);
}
}
@@ -168,12 +168,11 @@ final class BypassMergeSortShuffleWriter<K, V> implements SortShuffleFileWriter<
public void stop() throws IOException {
if (partitionWriters != null) {
try {
- final DiskBlockManager diskBlockManager = blockManager.diskBlockManager();
for (DiskBlockObjectWriter writer : partitionWriters) {
// This method explicitly does _not_ throw exceptions:
- writer.revertPartialWritesAndClose();
- if (!diskBlockManager.getFile(writer.blockId()).delete()) {
- logger.error("Error while deleting file for block {}", writer.blockId());
+ File file = writer.revertPartialWritesAndClose();
+ if (!file.delete()) {
+ logger.error("Error while deleting file {}", file.getAbsolutePath());
}
}
} finally {
diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
index 65887d119d..5e4c2b5d0a 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -119,9 +119,8 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
}
private[spark] object IndexShuffleBlockResolver {
- // No-op reduce ID used in interactions with disk store and DiskBlockObjectWriter.
+ // No-op reduce ID used in interactions with disk store.
// The disk store currently expects puts to relate to a (map, reduce) pair, but in the sort
// shuffle outputs for several reduces are glommed into a single file.
- // TODO: Avoid this entirely by having the DiskBlockObjectWriter not require a BlockId.
val NOOP_REDUCE_ID = 0
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index bca3942f8c..47bd2ef8b2 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -669,7 +669,7 @@ private[spark] class BlockManager(
writeMetrics: ShuffleWriteMetrics): DiskBlockObjectWriter = {
val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
val syncWrites = conf.getBoolean("spark.shuffle.sync", false)
- new DiskBlockObjectWriter(blockId, file, serializerInstance, bufferSize, compressStream,
+ new DiskBlockObjectWriter(file, serializerInstance, bufferSize, compressStream,
syncWrites, writeMetrics)
}
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
index 49d9154f95..80d426fadc 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
@@ -34,7 +34,6 @@ import org.apache.spark.util.Utils
* reopened again.
*/
private[spark] class DiskBlockObjectWriter(
- val blockId: BlockId,
file: File,
serializerInstance: SerializerInstance,
bufferSize: Int,
@@ -144,8 +143,10 @@ private[spark] class DiskBlockObjectWriter(
* Reverts writes that haven't been flushed yet. Callers should invoke this function
* when there are runtime exceptions. This method will not throw, though it may be
* unsuccessful in truncating written data.
+ *
+ * @return the file that this DiskBlockObjectWriter wrote to.
*/
- def revertPartialWritesAndClose() {
+ def revertPartialWritesAndClose(): File = {
// Discard current writes. We do this by flushing the outstanding writes and then
// truncating the file to its initial position.
try {
@@ -160,12 +161,14 @@ private[spark] class DiskBlockObjectWriter(
val truncateStream = new FileOutputStream(file, true)
try {
truncateStream.getChannel.truncate(initialPosition)
+ file
} finally {
truncateStream.close()
}
} catch {
case e: Exception =>
logError("Uncaught exception while reverting partial writes to file " + file, e)
+ file
}
}
diff --git a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
index a266b0c36e..d218344cd4 100644
--- a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
@@ -129,7 +129,6 @@ public class UnsafeShuffleWriterSuite {
Object[] args = invocationOnMock.getArguments();
return new DiskBlockObjectWriter(
- (BlockId) args[0],
(File) args[1],
(SerializerInstance) args[2],
(Integer) args[3],
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index 445a37b83e..a5bbaa95fa 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -127,7 +127,6 @@ public class UnsafeExternalSorterSuite {
Object[] args = invocationOnMock.getArguments();
return new DiskBlockObjectWriter(
- (BlockId) args[0],
(File) args[1],
(SerializerInstance) args[2],
(Integer) args[3],
diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
index cc7342f1ec..341f56df2d 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
@@ -72,7 +72,6 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
override def answer(invocation: InvocationOnMock): DiskBlockObjectWriter = {
val args = invocation.getArguments
new DiskBlockObjectWriter(
- args(0).asInstanceOf[BlockId],
args(1).asInstanceOf[File],
args(2).asInstanceOf[SerializerInstance],
args(3).asInstanceOf[Int],
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
index 66af6e1a79..7c19531c18 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
@@ -20,7 +20,6 @@ import java.io.File
import org.scalatest.BeforeAndAfterEach
-import org.apache.spark.SparkConf
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.serializer.JavaSerializer
@@ -41,8 +40,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
test("verify write metrics") {
val file = new File(tempDir, "somefile")
val writeMetrics = new ShuffleWriteMetrics()
- val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
- new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
+ val writer = new DiskBlockObjectWriter(
+ file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
writer.write(Long.box(20), Long.box(30))
// Record metrics update on every write
@@ -63,8 +62,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
test("verify write metrics on revert") {
val file = new File(tempDir, "somefile")
val writeMetrics = new ShuffleWriteMetrics()
- val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
- new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
+ val writer = new DiskBlockObjectWriter(
+ file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
writer.write(Long.box(20), Long.box(30))
// Record metrics update on every write
@@ -86,8 +85,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
test("Reopening a closed block writer") {
val file = new File(tempDir, "somefile")
val writeMetrics = new ShuffleWriteMetrics()
- val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
- new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
+ val writer = new DiskBlockObjectWriter(
+ file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
writer.open()
writer.close()
@@ -99,8 +98,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
test("calling revertPartialWritesAndClose() on a closed block writer should have no effect") {
val file = new File(tempDir, "somefile")
val writeMetrics = new ShuffleWriteMetrics()
- val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
- new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
+ val writer = new DiskBlockObjectWriter(
+ file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
for (i <- 1 to 1000) {
writer.write(i, i)
}
@@ -115,8 +114,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
test("commitAndClose() should be idempotent") {
val file = new File(tempDir, "somefile")
val writeMetrics = new ShuffleWriteMetrics()
- val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
- new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
+ val writer = new DiskBlockObjectWriter(
+ file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
for (i <- 1 to 1000) {
writer.write(i, i)
}
@@ -133,8 +132,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
test("revertPartialWritesAndClose() should be idempotent") {
val file = new File(tempDir, "somefile")
val writeMetrics = new ShuffleWriteMetrics()
- val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
- new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
+ val writer = new DiskBlockObjectWriter(
+ file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
for (i <- 1 to 1000) {
writer.write(i, i)
}
@@ -151,8 +150,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
test("fileSegment() can only be called after commitAndClose() has been called") {
val file = new File(tempDir, "somefile")
val writeMetrics = new ShuffleWriteMetrics()
- val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
- new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
+ val writer = new DiskBlockObjectWriter(
+ file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
for (i <- 1 to 1000) {
writer.write(i, i)
}
@@ -165,8 +164,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
test("commitAndClose() without ever opening or writing") {
val file = new File(tempDir, "somefile")
val writeMetrics = new ShuffleWriteMetrics()
- val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
- new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
+ val writer = new DiskBlockObjectWriter(
+ file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
writer.commitAndClose()
assert(writer.fileSegment().length === 0)
}