aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala7
3 files changed, 7 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
index 65887d119d..5e4c2b5d0a 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -119,9 +119,8 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
}
private[spark] object IndexShuffleBlockResolver {
- // No-op reduce ID used in interactions with disk store and DiskBlockObjectWriter.
+ // No-op reduce ID used in interactions with disk store.
// The disk store currently expects puts to relate to a (map, reduce) pair, but in the sort
// shuffle outputs for several reduces are glommed into a single file.
- // TODO: Avoid this entirely by having the DiskBlockObjectWriter not require a BlockId.
val NOOP_REDUCE_ID = 0
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index bca3942f8c..47bd2ef8b2 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -669,7 +669,7 @@ private[spark] class BlockManager(
writeMetrics: ShuffleWriteMetrics): DiskBlockObjectWriter = {
val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
val syncWrites = conf.getBoolean("spark.shuffle.sync", false)
- new DiskBlockObjectWriter(blockId, file, serializerInstance, bufferSize, compressStream,
+ new DiskBlockObjectWriter(file, serializerInstance, bufferSize, compressStream,
syncWrites, writeMetrics)
}
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
index 49d9154f95..80d426fadc 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
@@ -34,7 +34,6 @@ import org.apache.spark.util.Utils
* reopened again.
*/
private[spark] class DiskBlockObjectWriter(
- val blockId: BlockId,
file: File,
serializerInstance: SerializerInstance,
bufferSize: Int,
@@ -144,8 +143,10 @@ private[spark] class DiskBlockObjectWriter(
* Reverts writes that haven't been flushed yet. Callers should invoke this function
* when there are runtime exceptions. This method will not throw, though it may be
* unsuccessful in truncating written data.
+ *
+ * @return the file that this DiskBlockObjectWriter wrote to.
*/
- def revertPartialWritesAndClose() {
+ def revertPartialWritesAndClose(): File = {
// Discard current writes. We do this by flushing the outstanding writes and then
// truncating the file to its initial position.
try {
@@ -160,12 +161,14 @@ private[spark] class DiskBlockObjectWriter(
val truncateStream = new FileOutputStream(file, true)
try {
truncateStream.getChannel.truncate(initialPosition)
+ file
} finally {
truncateStream.close()
}
} catch {
case e: Exception =>
logError("Uncaught exception while reverting partial writes to file " + file, e)
+ file
}
}