diff options
author | Aaron Davidson <aaron@databricks.com> | 2013-11-21 21:36:08 -0800 |
---|---|---|
committer | Aaron Davidson <aaron@databricks.com> | 2013-11-21 21:36:08 -0800 |
commit | ccea38b759c81abea27bc0a51157a31d369839b5 (patch) | |
tree | a88530b73540928a1e857ba11658331b8d297190 | |
parent | e2ebc3a9d8bca83bf842b134f2f056c1af0ad2be (diff) | |
download | spark-ccea38b759c81abea27bc0a51157a31d369839b5.tar.gz spark-ccea38b759c81abea27bc0a51157a31d369839b5.tar.bz2 spark-ccea38b759c81abea27bc0a51157a31d369839b5.zip |
Fix 'timeWriting' stat for shuffle files
Due to concurrent git branches, changes from shuffle file consolidation patch
caused the shuffle write timing patch to no longer actually measure the time,
since it requires time be measured after the stream has been closed.
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala | 9 |
1 files changed, 6 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 1dc71a0428..0f2deb4bcb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -167,6 +167,7 @@ private[spark] class ShuffleMapTask( var totalTime = 0L val compressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter => writer.commit() + writer.close() val size = writer.fileSegment().length totalBytes += size totalTime += writer.timeWriting() @@ -184,14 +185,16 @@ private[spark] class ShuffleMapTask( } catch { case e: Exception => // If there is an exception from running the task, revert the partial writes // and throw the exception upstream to Spark. - if (shuffle != null) { - shuffle.writers.foreach(_.revertPartialWrites()) + if (shuffle != null && shuffle.writers != null) { + for (writer <- shuffle.writers) { + writer.revertPartialWrites() + writer.close() + } } throw e } finally { // Release the writers back to the shuffle block manager. if (shuffle != null && shuffle.writers != null) { - shuffle.writers.foreach(_.close()) shuffle.releaseWriters(success) } // Execute the callbacks on task completion. |