aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2013-11-21 21:36:08 -0800
committerAaron Davidson <aaron@databricks.com>2013-11-21 21:36:08 -0800
commitccea38b759c81abea27bc0a51157a31d369839b5 (patch)
treea88530b73540928a1e857ba11658331b8d297190
parente2ebc3a9d8bca83bf842b134f2f056c1af0ad2be (diff)
downloadspark-ccea38b759c81abea27bc0a51157a31d369839b5.tar.gz
spark-ccea38b759c81abea27bc0a51157a31d369839b5.tar.bz2
spark-ccea38b759c81abea27bc0a51157a31d369839b5.zip
Fix 'timeWriting' stat for shuffle files
Due to concurrent git branches, changes from shuffle file consolidation patch caused the shuffle write timing patch to no longer actually measure the time, since it requires time be measured after the stream has been closed.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala9
1 files changed, 6 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 1dc71a0428..0f2deb4bcb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -167,6 +167,7 @@ private[spark] class ShuffleMapTask(
var totalTime = 0L
val compressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter =>
writer.commit()
+ writer.close()
val size = writer.fileSegment().length
totalBytes += size
totalTime += writer.timeWriting()
@@ -184,14 +185,16 @@ private[spark] class ShuffleMapTask(
} catch { case e: Exception =>
// If there is an exception from running the task, revert the partial writes
// and throw the exception upstream to Spark.
- if (shuffle != null) {
- shuffle.writers.foreach(_.revertPartialWrites())
+ if (shuffle != null && shuffle.writers != null) {
+ for (writer <- shuffle.writers) {
+ writer.revertPartialWrites()
+ writer.close()
+ }
}
throw e
} finally {
// Release the writers back to the shuffle block manager.
if (shuffle != null && shuffle.writers != null) {
- shuffle.writers.foreach(_.close())
shuffle.releaseWriters(success)
}
// Execute the callbacks on task completion.