aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2017-01-23 13:36:41 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2017-01-23 13:36:41 -0800
commite4974721f33e64604501f673f74052e11920d438 (patch)
treeb770a5fa5bd9b6039f499186898ef8c0f1f6edd9 /core
parent5b258b8b0752d13842d40ae69107f7976678cf17 (diff)
downloadspark-e4974721f33e64604501f673f74052e11920d438.tar.gz
spark-e4974721f33e64604501f673f74052e11920d438.tar.bz2
spark-e4974721f33e64604501f673f74052e11920d438.zip
[SPARK-19306][CORE] Fix inconsistent state in DiskBlockObject when expection occurred
## What changes were proposed in this pull request? In `DiskBlockObjectWriter`, when some errors happened during writing, it will call `revertPartialWritesAndClose`, if this method again failed due to some issues like out of disk, it will throw exception without resetting the state of this writer, also skipping the revert. So here propose to fix this issue to offer user a chance to recover from such issue. ## How was this patch tested? Existing test. Author: jerryshao <sshao@hortonworks.com> Closes #16657 from jerryshao/SPARK-19306.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala44
1 files changed, 25 insertions, 19 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
index 3cb12fca7d..eb3ff92637 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
@@ -128,16 +128,19 @@ private[spark] class DiskBlockObjectWriter(
*/
private def closeResources(): Unit = {
if (initialized) {
- mcs.manualClose()
- channel = null
- mcs = null
- bs = null
- fos = null
- ts = null
- objOut = null
- initialized = false
- streamOpen = false
- hasBeenClosed = true
+ Utils.tryWithSafeFinally {
+ mcs.manualClose()
+ } {
+ channel = null
+ mcs = null
+ bs = null
+ fos = null
+ ts = null
+ objOut = null
+ initialized = false
+ streamOpen = false
+ hasBeenClosed = true
+ }
}
}
@@ -199,26 +202,29 @@ private[spark] class DiskBlockObjectWriter(
def revertPartialWritesAndClose(): File = {
// Discard current writes. We do this by flushing the outstanding writes and then
// truncating the file to its initial position.
- try {
+ Utils.tryWithSafeFinally {
if (initialized) {
writeMetrics.decBytesWritten(reportedPosition - committedPosition)
writeMetrics.decRecordsWritten(numRecordsWritten)
streamOpen = false
closeResources()
}
-
- val truncateStream = new FileOutputStream(file, true)
+ } {
+ var truncateStream: FileOutputStream = null
try {
+ truncateStream = new FileOutputStream(file, true)
truncateStream.getChannel.truncate(committedPosition)
- file
+ } catch {
+ case e: Exception =>
+ logError("Uncaught exception while reverting partial writes to file " + file, e)
} finally {
- truncateStream.close()
+ if (truncateStream != null) {
+ truncateStream.close()
+ truncateStream = null
+ }
}
- } catch {
- case e: Exception =>
- logError("Uncaught exception while reverting partial writes to file " + file, e)
- file
}
+ file
}
/**