diff options
author | jerryshao <sshao@hortonworks.com> | 2017-01-23 13:36:41 -0800 |
---|---|---|
committer | Marcelo Vanzin <vanzin@cloudera.com> | 2017-01-23 13:36:41 -0800 |
commit | e4974721f33e64604501f673f74052e11920d438 (patch) | |
tree | b770a5fa5bd9b6039f499186898ef8c0f1f6edd9 /core | |
parent | 5b258b8b0752d13842d40ae69107f7976678cf17 (diff) | |
download | spark-e4974721f33e64604501f673f74052e11920d438.tar.gz spark-e4974721f33e64604501f673f74052e11920d438.tar.bz2 spark-e4974721f33e64604501f673f74052e11920d438.zip |
[SPARK-19306][CORE] Fix inconsistent state in DiskBlockObject when expection occurred
## What changes were proposed in this pull request?
In `DiskBlockObjectWriter`, when some errors happened during writing, it will call `revertPartialWritesAndClose`, if this method again failed due to some issues like out of disk, it will throw exception without resetting the state of this writer, also skipping the revert. So here propose to fix this issue to offer user a chance to recover from such issue.
## How was this patch tested?
Existing test.
Author: jerryshao <sshao@hortonworks.com>
Closes #16657 from jerryshao/SPARK-19306.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala | 44 |
1 files changed, 25 insertions, 19 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala index 3cb12fca7d..eb3ff92637 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala @@ -128,16 +128,19 @@ private[spark] class DiskBlockObjectWriter( */ private def closeResources(): Unit = { if (initialized) { - mcs.manualClose() - channel = null - mcs = null - bs = null - fos = null - ts = null - objOut = null - initialized = false - streamOpen = false - hasBeenClosed = true + Utils.tryWithSafeFinally { + mcs.manualClose() + } { + channel = null + mcs = null + bs = null + fos = null + ts = null + objOut = null + initialized = false + streamOpen = false + hasBeenClosed = true + } } } @@ -199,26 +202,29 @@ private[spark] class DiskBlockObjectWriter( def revertPartialWritesAndClose(): File = { // Discard current writes. We do this by flushing the outstanding writes and then // truncating the file to its initial position. - try { + Utils.tryWithSafeFinally { if (initialized) { writeMetrics.decBytesWritten(reportedPosition - committedPosition) writeMetrics.decRecordsWritten(numRecordsWritten) streamOpen = false closeResources() } - - val truncateStream = new FileOutputStream(file, true) + } { + var truncateStream: FileOutputStream = null try { + truncateStream = new FileOutputStream(file, true) truncateStream.getChannel.truncate(committedPosition) - file + } catch { + case e: Exception => + logError("Uncaught exception while reverting partial writes to file " + file, e) } finally { - truncateStream.close() + if (truncateStream != null) { + truncateStream.close() + truncateStream = null + } } - } catch { - case e: Exception => - logError("Uncaught exception while reverting partial writes to file " + file, e) - file } + file } /** |