aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2014-08-01 13:57:19 -0700
committerMatei Zaharia <matei@databricks.com>2014-08-01 13:57:19 -0700
commit78f2af582286b81e6dc9fa9d455ed2b369d933bd (patch)
treed1b31d36d51ef5c6442ff8f6d19434c09d61f11c
parentb270309d7608fb749e402cd5afd36087446be398 (diff)
downloadspark-78f2af582286b81e6dc9fa9d455ed2b369d933bd.tar.gz
spark-78f2af582286b81e6dc9fa9d455ed2b369d933bd.tar.bz2
spark-78f2af582286b81e6dc9fa9d455ed2b369d933bd.zip
SPARK-2791: Fix committing, reverting and state tracking in shuffle file consolidation
All changes from this PR are by mridulm and are drawn from his work in #1609. This patch is intended to fix all major issues related to shuffle file consolidation that mridulm found, while minimizing changes to the code, with the hope that it may be more easily merged into 1.1. This patch is **not** intended as a replacement for #1609, which provides many additional benefits, including fixes to ExternalAppendOnlyMap, improvements to DiskBlockObjectWriter's API, and several new unit tests. If it is feasible to merge #1609 for the 1.1 deadline, that is a preferable option. Author: Aaron Davidson <aaron@databricks.com> Closes #1678 from aarondav/consol and squashes the following commits: 53b3f6d [Aaron Davidson] Correct behavior when writing unopened file 701d045 [Aaron Davidson] Rebase with sort-based shuffle 9160149 [Aaron Davidson] SPARK-2532: Minimal shuffle consolidation fixes
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala53
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala28
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala87
-rw-r--r--tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala5
8 files changed, 146 insertions, 52 deletions
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
index 1923f7c71a..45d3b8b9b8 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
@@ -65,7 +65,8 @@ private[spark] class HashShuffleWriter[K, V](
}
/** Close this writer, passing along whether the map completed */
- override def stop(success: Boolean): Option[MapStatus] = {
+ override def stop(initiallySuccess: Boolean): Option[MapStatus] = {
+ var success = initiallySuccess
try {
if (stopping) {
return None
@@ -73,15 +74,16 @@ private[spark] class HashShuffleWriter[K, V](
stopping = true
if (success) {
try {
- return Some(commitWritesAndBuildStatus())
+ Some(commitWritesAndBuildStatus())
} catch {
case e: Exception =>
+ success = false
revertWrites()
throw e
}
} else {
revertWrites()
- return None
+ None
}
} finally {
// Release the writers back to the shuffle block manager.
@@ -100,8 +102,7 @@ private[spark] class HashShuffleWriter[K, V](
var totalBytes = 0L
var totalTime = 0L
val compressedSizes = shuffle.writers.map { writer: BlockObjectWriter =>
- writer.commit()
- writer.close()
+ writer.commitAndClose()
val size = writer.fileSegment().length
totalBytes += size
totalTime += writer.timeWriting()
@@ -120,8 +121,7 @@ private[spark] class HashShuffleWriter[K, V](
private def revertWrites(): Unit = {
if (shuffle != null && shuffle.writers != null) {
for (writer <- shuffle.writers) {
- writer.revertPartialWrites()
- writer.close()
+ writer.revertPartialWritesAndClose()
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index 42fcd07fa1..9a356d0dba 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -94,8 +94,7 @@ private[spark] class SortShuffleWriter[K, V, C](
for (elem <- elements) {
writer.write(elem)
}
- writer.commit()
- writer.close()
+ writer.commitAndClose()
val segment = writer.fileSegment()
offsets(id + 1) = segment.offset + segment.length
lengths(id) = segment.length
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index a2687e6be4..01d46e1ffc 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -39,16 +39,16 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {
def isOpen: Boolean
/**
- * Flush the partial writes and commit them as a single atomic block. Return the
- * number of bytes written for this commit.
+ * Flush the partial writes and commit them as a single atomic block.
*/
- def commit(): Long
+ def commitAndClose(): Unit
/**
* Reverts writes that haven't been flushed yet. Callers should invoke this function
- * when there are runtime exceptions.
+ * when there are runtime exceptions. This method will not throw, though it may be
+ * unsuccessful in truncating written data.
*/
- def revertPartialWrites()
+ def revertPartialWritesAndClose()
/**
* Writes an object.
@@ -57,6 +57,7 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {
/**
* Returns the file segment of committed data that this Writer has written.
+ * This is only valid after commitAndClose() has been called.
*/
def fileSegment(): FileSegment
@@ -108,7 +109,7 @@ private[spark] class DiskBlockObjectWriter(
private var ts: TimeTrackingOutputStream = null
private var objOut: SerializationStream = null
private val initialPosition = file.length()
- private var lastValidPosition = initialPosition
+ private var finalPosition: Long = -1
private var initialized = false
private var _timeWriting = 0L
@@ -116,7 +117,6 @@ private[spark] class DiskBlockObjectWriter(
fos = new FileOutputStream(file, true)
ts = new TimeTrackingOutputStream(fos)
channel = fos.getChannel()
- lastValidPosition = initialPosition
bs = compressStream(new BufferedOutputStream(ts, bufferSize))
objOut = serializer.newInstance().serializeStream(bs)
initialized = true
@@ -147,28 +147,36 @@ private[spark] class DiskBlockObjectWriter(
override def isOpen: Boolean = objOut != null
- override def commit(): Long = {
+ override def commitAndClose(): Unit = {
if (initialized) {
// NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the
// serializer stream and the lower level stream.
objOut.flush()
bs.flush()
- val prevPos = lastValidPosition
- lastValidPosition = channel.position()
- lastValidPosition - prevPos
- } else {
- // lastValidPosition is zero if stream is uninitialized
- lastValidPosition
+ close()
}
+ finalPosition = file.length()
}
- override def revertPartialWrites() {
- if (initialized) {
- // Discard current writes. We do this by flushing the outstanding writes and
- // truncate the file to the last valid position.
- objOut.flush()
- bs.flush()
- channel.truncate(lastValidPosition)
+ // Discard current writes. We do this by flushing the outstanding writes and then
+ // truncating the file to its initial position.
+ override def revertPartialWritesAndClose() {
+ try {
+ if (initialized) {
+ objOut.flush()
+ bs.flush()
+ close()
+ }
+
+ val truncateStream = new FileOutputStream(file, true)
+ try {
+ truncateStream.getChannel.truncate(initialPosition)
+ } finally {
+ truncateStream.close()
+ }
+ } catch {
+ case e: Exception =>
+ logError("Uncaught exception while reverting partial writes to file " + file, e)
}
}
@@ -188,6 +196,7 @@ private[spark] class DiskBlockObjectWriter(
// Only valid if called after commit()
override def bytesWritten: Long = {
- lastValidPosition - initialPosition
+ assert(finalPosition != -1, "bytesWritten is only valid after successful commit()")
+ finalPosition - initialPosition
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index 7beb55c411..28aa35bc7e 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -144,7 +144,8 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
if (consolidateShuffleFiles) {
if (success) {
val offsets = writers.map(_.fileSegment().offset)
- fileGroup.recordMapOutput(mapId, offsets)
+ val lengths = writers.map(_.fileSegment().length)
+ fileGroup.recordMapOutput(mapId, offsets, lengths)
}
recycleFileGroup(fileGroup)
} else {
@@ -247,6 +248,8 @@ object ShuffleBlockManager {
* A particular mapper will be assigned a single ShuffleFileGroup to write its output to.
*/
private class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[File]) {
+ private var numBlocks: Int = 0
+
/**
* Stores the absolute index of each mapId in the files of this group. For instance,
* if mapId 5 is the first block in each file, mapIdToIndex(5) = 0.
@@ -254,23 +257,27 @@ object ShuffleBlockManager {
private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()
/**
- * Stores consecutive offsets of blocks into each reducer file, ordered by position in the file.
- * This ordering allows us to compute block lengths by examining the following block offset.
+ * Stores consecutive offsets and lengths of blocks into each reducer file, ordered by
+ * position in the file.
* Note: mapIdToIndex(mapId) returns the index of the mapper into the vector for every
* reducer.
*/
private val blockOffsetsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
new PrimitiveVector[Long]()
}
-
- def numBlocks = mapIdToIndex.size
+ private val blockLengthsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
+ new PrimitiveVector[Long]()
+ }
def apply(bucketId: Int) = files(bucketId)
- def recordMapOutput(mapId: Int, offsets: Array[Long]) {
+ def recordMapOutput(mapId: Int, offsets: Array[Long], lengths: Array[Long]) {
+ assert(offsets.length == lengths.length)
mapIdToIndex(mapId) = numBlocks
+ numBlocks += 1
for (i <- 0 until offsets.length) {
blockOffsetsByReducer(i) += offsets(i)
+ blockLengthsByReducer(i) += lengths(i)
}
}
@@ -278,16 +285,11 @@ object ShuffleBlockManager {
def getFileSegmentFor(mapId: Int, reducerId: Int): Option[FileSegment] = {
val file = files(reducerId)
val blockOffsets = blockOffsetsByReducer(reducerId)
+ val blockLengths = blockLengthsByReducer(reducerId)
val index = mapIdToIndex.getOrElse(mapId, -1)
if (index >= 0) {
val offset = blockOffsets(index)
- val length =
- if (index + 1 < numBlocks) {
- blockOffsets(index + 1) - offset
- } else {
- file.length() - offset
- }
- assert(length >= 0)
+ val length = blockLengths(index)
Some(new FileSegment(file, offset, length))
} else {
None
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index b34512ef9e..cb67a1c039 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -199,7 +199,7 @@ class ExternalAppendOnlyMap[K, V, C](
// Flush the disk writer's contents to disk, and update relevant variables
def flush() = {
- writer.commit()
+ writer.commitAndClose()
val bytesWritten = writer.bytesWritten
batchSizes.append(bytesWritten)
_diskBytesSpilled += bytesWritten
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 54c3310744..6e415a2bd8 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -270,9 +270,10 @@ private[spark] class ExternalSorter[K, V, C](
// How many elements we have in each partition
val elementsPerPartition = new Array[Long](numPartitions)
- // Flush the disk writer's contents to disk, and update relevant variables
+ // Flush the disk writer's contents to disk, and update relevant variables.
+ // The writer is closed at the end of this process, and cannot be reused.
def flush() = {
- writer.commit()
+ writer.commitAndClose()
val bytesWritten = writer.bytesWritten
batchSizes.append(bytesWritten)
_diskBytesSpilled += bytesWritten
@@ -293,7 +294,6 @@ private[spark] class ExternalSorter[K, V, C](
if (objectsWritten == serializerBatchSize) {
flush()
- writer.close()
writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize)
}
}
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index aaa7714049..985ac93947 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -22,11 +22,14 @@ import java.io.{File, FileWriter}
import scala.collection.mutable
import scala.language.reflectiveCalls
+import akka.actor.Props
import com.google.common.io.Files
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
import org.apache.spark.SparkConf
-import org.apache.spark.util.Utils
+import org.apache.spark.scheduler.LiveListenerBus
+import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.util.{AkkaUtils, Utils}
class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll {
private val testConf = new SparkConf(false)
@@ -121,6 +124,88 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before
newFile.delete()
}
+ private def checkSegments(segment1: FileSegment, segment2: FileSegment) {
+ assert (segment1.file.getCanonicalPath === segment2.file.getCanonicalPath)
+ assert (segment1.offset === segment2.offset)
+ assert (segment1.length === segment2.length)
+ }
+
+ test("consolidated shuffle can write to shuffle group without messing existing offsets/lengths") {
+
+ val serializer = new JavaSerializer(testConf)
+ val confCopy = testConf.clone
+ // reset after EACH object write. This is to ensure that there are bytes appended after
+ // an object is written. So if the codepaths assume writeObject is end of data, this should
+ // flush those bugs out. This was common bug in ExternalAppendOnlyMap, etc.
+ confCopy.set("spark.serializer.objectStreamReset", "1")
+
+ val securityManager = new org.apache.spark.SecurityManager(confCopy)
+ // Do not use the shuffleBlockManager above !
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0, confCopy,
+ securityManager)
+ val master = new BlockManagerMaster(
+ actorSystem.actorOf(Props(new BlockManagerMasterActor(true, confCopy, new LiveListenerBus))),
+ confCopy)
+ val store = new BlockManager("<driver>", actorSystem, master , serializer, confCopy,
+ securityManager, null)
+
+ try {
+
+ val shuffleManager = store.shuffleBlockManager
+
+ val shuffle1 = shuffleManager.forMapTask(1, 1, 1, serializer)
+ for (writer <- shuffle1.writers) {
+ writer.write("test1")
+ writer.write("test2")
+ }
+ for (writer <- shuffle1.writers) {
+ writer.commitAndClose()
+ }
+
+ val shuffle1Segment = shuffle1.writers(0).fileSegment()
+ shuffle1.releaseWriters(success = true)
+
+ val shuffle2 = shuffleManager.forMapTask(1, 2, 1, new JavaSerializer(testConf))
+
+ for (writer <- shuffle2.writers) {
+ writer.write("test3")
+ writer.write("test4")
+ }
+ for (writer <- shuffle2.writers) {
+ writer.commitAndClose()
+ }
+ val shuffle2Segment = shuffle2.writers(0).fileSegment()
+ shuffle2.releaseWriters(success = true)
+
+ // Now comes the test :
+ // Write to shuffle 3; and close it, but before registering it, check if the file lengths for
+ // previous task (forof shuffle1) is the same as 'segments'. Earlier, we were inferring length
+ // of block based on remaining data in file : which could mess things up when there is concurrent read
+ // and writes happening to the same shuffle group.
+
+ val shuffle3 = shuffleManager.forMapTask(1, 3, 1, new JavaSerializer(testConf))
+ for (writer <- shuffle3.writers) {
+ writer.write("test3")
+ writer.write("test4")
+ }
+ for (writer <- shuffle3.writers) {
+ writer.commitAndClose()
+ }
+ // check before we register.
+ checkSegments(shuffle2Segment, shuffleManager.getBlockLocation(ShuffleBlockId(1, 2, 0)))
+ shuffle3.releaseWriters(success = true)
+ checkSegments(shuffle2Segment, shuffleManager.getBlockLocation(ShuffleBlockId(1, 2, 0)))
+ shuffleManager.removeShuffle(1)
+ } finally {
+
+ if (store != null) {
+ store.stop()
+ }
+ actorSystem.shutdown()
+ actorSystem.awaitTermination()
+ }
+ }
+
def assertSegmentEquals(blockId: BlockId, filename: String, offset: Int, length: Int) {
val segment = diskBlockManager.getBlockLocation(blockId)
assert(segment.file.getName === filename)
diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
index 8e8c35615a..8a05fcb449 100644
--- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
@@ -61,10 +61,9 @@ object StoragePerfTester {
for (i <- 1 to recordsPerMap) {
writers(i % numOutputSplits).write(writeData)
}
- writers.map {w =>
- w.commit()
+ writers.map { w =>
+ w.commitAndClose()
total.addAndGet(w.fileSegment().length)
- w.close()
}
shuffle.releaseWriters(true)