aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala53
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala28
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala87
-rw-r--r--tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala5
8 files changed, 146 insertions, 52 deletions
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
index 1923f7c71a..45d3b8b9b8 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
@@ -65,7 +65,8 @@ private[spark] class HashShuffleWriter[K, V](
}
/** Close this writer, passing along whether the map completed */
- override def stop(success: Boolean): Option[MapStatus] = {
+ override def stop(initiallySuccess: Boolean): Option[MapStatus] = {
+ var success = initiallySuccess
try {
if (stopping) {
return None
@@ -73,15 +74,16 @@ private[spark] class HashShuffleWriter[K, V](
stopping = true
if (success) {
try {
- return Some(commitWritesAndBuildStatus())
+ Some(commitWritesAndBuildStatus())
} catch {
case e: Exception =>
+ success = false
revertWrites()
throw e
}
} else {
revertWrites()
- return None
+ None
}
} finally {
// Release the writers back to the shuffle block manager.
@@ -100,8 +102,7 @@ private[spark] class HashShuffleWriter[K, V](
var totalBytes = 0L
var totalTime = 0L
val compressedSizes = shuffle.writers.map { writer: BlockObjectWriter =>
- writer.commit()
- writer.close()
+ writer.commitAndClose()
val size = writer.fileSegment().length
totalBytes += size
totalTime += writer.timeWriting()
@@ -120,8 +121,7 @@ private[spark] class HashShuffleWriter[K, V](
private def revertWrites(): Unit = {
if (shuffle != null && shuffle.writers != null) {
for (writer <- shuffle.writers) {
- writer.revertPartialWrites()
- writer.close()
+ writer.revertPartialWritesAndClose()
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index 42fcd07fa1..9a356d0dba 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -94,8 +94,7 @@ private[spark] class SortShuffleWriter[K, V, C](
for (elem <- elements) {
writer.write(elem)
}
- writer.commit()
- writer.close()
+ writer.commitAndClose()
val segment = writer.fileSegment()
offsets(id + 1) = segment.offset + segment.length
lengths(id) = segment.length
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index a2687e6be4..01d46e1ffc 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -39,16 +39,16 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {
def isOpen: Boolean
/**
- * Flush the partial writes and commit them as a single atomic block. Return the
- * number of bytes written for this commit.
+ * Flush the partial writes and commit them as a single atomic block.
*/
- def commit(): Long
+ def commitAndClose(): Unit
/**
* Reverts writes that haven't been flushed yet. Callers should invoke this function
- * when there are runtime exceptions.
+ * when there are runtime exceptions. This method will not throw, though it may be
+ * unsuccessful in truncating written data.
*/
- def revertPartialWrites()
+ def revertPartialWritesAndClose()
/**
* Writes an object.
@@ -57,6 +57,7 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {
/**
* Returns the file segment of committed data that this Writer has written.
+ * This is only valid after commitAndClose() has been called.
*/
def fileSegment(): FileSegment
@@ -108,7 +109,7 @@ private[spark] class DiskBlockObjectWriter(
private var ts: TimeTrackingOutputStream = null
private var objOut: SerializationStream = null
private val initialPosition = file.length()
- private var lastValidPosition = initialPosition
+ private var finalPosition: Long = -1
private var initialized = false
private var _timeWriting = 0L
@@ -116,7 +117,6 @@ private[spark] class DiskBlockObjectWriter(
fos = new FileOutputStream(file, true)
ts = new TimeTrackingOutputStream(fos)
channel = fos.getChannel()
- lastValidPosition = initialPosition
bs = compressStream(new BufferedOutputStream(ts, bufferSize))
objOut = serializer.newInstance().serializeStream(bs)
initialized = true
@@ -147,28 +147,36 @@ private[spark] class DiskBlockObjectWriter(
override def isOpen: Boolean = objOut != null
- override def commit(): Long = {
+ override def commitAndClose(): Unit = {
if (initialized) {
// NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the
// serializer stream and the lower level stream.
objOut.flush()
bs.flush()
- val prevPos = lastValidPosition
- lastValidPosition = channel.position()
- lastValidPosition - prevPos
- } else {
- // lastValidPosition is zero if stream is uninitialized
- lastValidPosition
+ close()
}
+ finalPosition = file.length()
}
- override def revertPartialWrites() {
- if (initialized) {
- // Discard current writes. We do this by flushing the outstanding writes and
- // truncate the file to the last valid position.
- objOut.flush()
- bs.flush()
- channel.truncate(lastValidPosition)
+ // Discard current writes. We do this by flushing the outstanding writes and then
+ // truncating the file to its initial position.
+ override def revertPartialWritesAndClose() {
+ try {
+ if (initialized) {
+ objOut.flush()
+ bs.flush()
+ close()
+ }
+
+ val truncateStream = new FileOutputStream(file, true)
+ try {
+ truncateStream.getChannel.truncate(initialPosition)
+ } finally {
+ truncateStream.close()
+ }
+ } catch {
+ case e: Exception =>
+ logError("Uncaught exception while reverting partial writes to file " + file, e)
}
}
@@ -188,6 +196,7 @@ private[spark] class DiskBlockObjectWriter(
// Only valid if called after commit()
override def bytesWritten: Long = {
- lastValidPosition - initialPosition
+ assert(finalPosition != -1, "bytesWritten is only valid after successful commit()")
+ finalPosition - initialPosition
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index 7beb55c411..28aa35bc7e 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -144,7 +144,8 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
if (consolidateShuffleFiles) {
if (success) {
val offsets = writers.map(_.fileSegment().offset)
- fileGroup.recordMapOutput(mapId, offsets)
+ val lengths = writers.map(_.fileSegment().length)
+ fileGroup.recordMapOutput(mapId, offsets, lengths)
}
recycleFileGroup(fileGroup)
} else {
@@ -247,6 +248,8 @@ object ShuffleBlockManager {
* A particular mapper will be assigned a single ShuffleFileGroup to write its output to.
*/
private class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[File]) {
+ private var numBlocks: Int = 0
+
/**
* Stores the absolute index of each mapId in the files of this group. For instance,
* if mapId 5 is the first block in each file, mapIdToIndex(5) = 0.
@@ -254,23 +257,27 @@ object ShuffleBlockManager {
private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()
/**
- * Stores consecutive offsets of blocks into each reducer file, ordered by position in the file.
- * This ordering allows us to compute block lengths by examining the following block offset.
+ * Stores consecutive offsets and lengths of blocks into each reducer file, ordered by
+ * position in the file.
* Note: mapIdToIndex(mapId) returns the index of the mapper into the vector for every
* reducer.
*/
private val blockOffsetsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
new PrimitiveVector[Long]()
}
-
- def numBlocks = mapIdToIndex.size
+ private val blockLengthsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
+ new PrimitiveVector[Long]()
+ }
def apply(bucketId: Int) = files(bucketId)
- def recordMapOutput(mapId: Int, offsets: Array[Long]) {
+ def recordMapOutput(mapId: Int, offsets: Array[Long], lengths: Array[Long]) {
+ assert(offsets.length == lengths.length)
mapIdToIndex(mapId) = numBlocks
+ numBlocks += 1
for (i <- 0 until offsets.length) {
blockOffsetsByReducer(i) += offsets(i)
+ blockLengthsByReducer(i) += lengths(i)
}
}
@@ -278,16 +285,11 @@ object ShuffleBlockManager {
def getFileSegmentFor(mapId: Int, reducerId: Int): Option[FileSegment] = {
val file = files(reducerId)
val blockOffsets = blockOffsetsByReducer(reducerId)
+ val blockLengths = blockLengthsByReducer(reducerId)
val index = mapIdToIndex.getOrElse(mapId, -1)
if (index >= 0) {
val offset = blockOffsets(index)
- val length =
- if (index + 1 < numBlocks) {
- blockOffsets(index + 1) - offset
- } else {
- file.length() - offset
- }
- assert(length >= 0)
+ val length = blockLengths(index)
Some(new FileSegment(file, offset, length))
} else {
None
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index b34512ef9e..cb67a1c039 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -199,7 +199,7 @@ class ExternalAppendOnlyMap[K, V, C](
// Flush the disk writer's contents to disk, and update relevant variables
def flush() = {
- writer.commit()
+ writer.commitAndClose()
val bytesWritten = writer.bytesWritten
batchSizes.append(bytesWritten)
_diskBytesSpilled += bytesWritten
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 54c3310744..6e415a2bd8 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -270,9 +270,10 @@ private[spark] class ExternalSorter[K, V, C](
// How many elements we have in each partition
val elementsPerPartition = new Array[Long](numPartitions)
- // Flush the disk writer's contents to disk, and update relevant variables
+ // Flush the disk writer's contents to disk, and update relevant variables.
+ // The writer is closed at the end of this process, and cannot be reused.
def flush() = {
- writer.commit()
+ writer.commitAndClose()
val bytesWritten = writer.bytesWritten
batchSizes.append(bytesWritten)
_diskBytesSpilled += bytesWritten
@@ -293,7 +294,6 @@ private[spark] class ExternalSorter[K, V, C](
if (objectsWritten == serializerBatchSize) {
flush()
- writer.close()
writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize)
}
}
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index aaa7714049..985ac93947 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -22,11 +22,14 @@ import java.io.{File, FileWriter}
import scala.collection.mutable
import scala.language.reflectiveCalls
+import akka.actor.Props
import com.google.common.io.Files
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
import org.apache.spark.SparkConf
-import org.apache.spark.util.Utils
+import org.apache.spark.scheduler.LiveListenerBus
+import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.util.{AkkaUtils, Utils}
class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll {
private val testConf = new SparkConf(false)
@@ -121,6 +124,88 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before
newFile.delete()
}
+ private def checkSegments(segment1: FileSegment, segment2: FileSegment) {
+ assert (segment1.file.getCanonicalPath === segment2.file.getCanonicalPath)
+ assert (segment1.offset === segment2.offset)
+ assert (segment1.length === segment2.length)
+ }
+
+ test("consolidated shuffle can write to shuffle group without messing existing offsets/lengths") {
+
+ val serializer = new JavaSerializer(testConf)
+ val confCopy = testConf.clone
+ // reset after EACH object write. This is to ensure that there are bytes appended after
+ // an object is written. So if the codepaths assume writeObject is end of data, this should
+ // flush those bugs out. This was common bug in ExternalAppendOnlyMap, etc.
+ confCopy.set("spark.serializer.objectStreamReset", "1")
+
+ val securityManager = new org.apache.spark.SecurityManager(confCopy)
+ // Do not use the shuffleBlockManager above !
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0, confCopy,
+ securityManager)
+ val master = new BlockManagerMaster(
+ actorSystem.actorOf(Props(new BlockManagerMasterActor(true, confCopy, new LiveListenerBus))),
+ confCopy)
+ val store = new BlockManager("<driver>", actorSystem, master , serializer, confCopy,
+ securityManager, null)
+
+ try {
+
+ val shuffleManager = store.shuffleBlockManager
+
+ val shuffle1 = shuffleManager.forMapTask(1, 1, 1, serializer)
+ for (writer <- shuffle1.writers) {
+ writer.write("test1")
+ writer.write("test2")
+ }
+ for (writer <- shuffle1.writers) {
+ writer.commitAndClose()
+ }
+
+ val shuffle1Segment = shuffle1.writers(0).fileSegment()
+ shuffle1.releaseWriters(success = true)
+
+ val shuffle2 = shuffleManager.forMapTask(1, 2, 1, new JavaSerializer(testConf))
+
+ for (writer <- shuffle2.writers) {
+ writer.write("test3")
+ writer.write("test4")
+ }
+ for (writer <- shuffle2.writers) {
+ writer.commitAndClose()
+ }
+ val shuffle2Segment = shuffle2.writers(0).fileSegment()
+ shuffle2.releaseWriters(success = true)
+
+ // Now comes the test :
+ // Write to shuffle 3; and close it, but before registering it, check if the file lengths for
+ // previous task (forof shuffle1) is the same as 'segments'. Earlier, we were inferring length
+ // of block based on remaining data in file : which could mess things up when there is concurrent read
+ // and writes happening to the same shuffle group.
+
+ val shuffle3 = shuffleManager.forMapTask(1, 3, 1, new JavaSerializer(testConf))
+ for (writer <- shuffle3.writers) {
+ writer.write("test3")
+ writer.write("test4")
+ }
+ for (writer <- shuffle3.writers) {
+ writer.commitAndClose()
+ }
+ // check before we register.
+ checkSegments(shuffle2Segment, shuffleManager.getBlockLocation(ShuffleBlockId(1, 2, 0)))
+ shuffle3.releaseWriters(success = true)
+ checkSegments(shuffle2Segment, shuffleManager.getBlockLocation(ShuffleBlockId(1, 2, 0)))
+ shuffleManager.removeShuffle(1)
+ } finally {
+
+ if (store != null) {
+ store.stop()
+ }
+ actorSystem.shutdown()
+ actorSystem.awaitTermination()
+ }
+ }
+
def assertSegmentEquals(blockId: BlockId, filename: String, offset: Int, length: Int) {
val segment = diskBlockManager.getBlockLocation(blockId)
assert(segment.file.getName === filename)
diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
index 8e8c35615a..8a05fcb449 100644
--- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
@@ -61,10 +61,9 @@ object StoragePerfTester {
for (i <- 1 to recordsPerMap) {
writers(i % numOutputSplits).write(writeData)
}
- writers.map {w =>
- w.commit()
+ writers.map { w =>
+ w.commitAndClose()
total.addAndGet(w.fileSegment().length)
- w.close()
}
shuffle.releaseWriters(true)