diff options
Diffstat (limited to 'core/src')
3 files changed, 31 insertions, 20 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 296179b75b..085829af6e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -1111,9 +1111,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) maybeUpdateOutputMetrics(outputMetricsAndBytesWrittenCallback, recordsWritten) recordsWritten += 1 } - } { - writer.close(hadoopContext) - } + }(finallyBlock = writer.close(hadoopContext)) committer.commitTask(hadoopContext) outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) => om.setBytesWritten(callback()) @@ -1200,9 +1198,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) maybeUpdateOutputMetrics(outputMetricsAndBytesWrittenCallback, recordsWritten) recordsWritten += 1 } - } { - writer.close() - } + }(finallyBlock = writer.close()) writer.commit() outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) => om.setBytesWritten(callback()) diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 46c64f61de..c91d8fbfc4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -80,10 +80,16 @@ private[spark] abstract class Task[T]( } try { runTask(context) - } catch { case e: Throwable => - // Catch all errors; run task failure callbacks, and rethrow the exception. - context.markTaskFailed(e) - throw e + } catch { + case e: Throwable => + // Catch all errors; run task failure callbacks, and rethrow the exception. + try { + context.markTaskFailed(e) + } catch { + case t: Throwable => + e.addSuppressed(t) + } + throw e } finally { // Call the task completion callbacks. context.markTaskCompleted() diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index c304629bcd..78e164cff7 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1260,26 +1260,35 @@ private[spark] object Utils extends Logging { } /** - * Execute a block of code, call the failure callbacks before finally block if there is any - * exceptions happen. But if exceptions happen in the finally block, do not suppress the original - * exception. + * Execute a block of code and call the failure callbacks in the catch block. If exceptions occur + * in either the catch or the finally block, they are appended to the list of suppressed + * exceptions in original exception which is then rethrown. * - * This is primarily an issue with `finally { out.close() }` blocks, where - * close needs to be called to clean up `out`, but if an exception happened - * in `out.write`, it's likely `out` may be corrupted and `out.close` will + * This is primarily an issue with `catch { abort() }` or `finally { out.close() }` blocks, + * where the abort/close needs to be called to clean up `out`, but if an exception happened + * in `out.write`, it's likely `out` may be corrupted and `abort` or `out.close` will * fail as well. This would then suppress the original/likely more meaningful * exception from the original `out.write` call. */ - def tryWithSafeFinallyAndFailureCallbacks[T](block: => T)(finallyBlock: => Unit): T = { + def tryWithSafeFinallyAndFailureCallbacks[T](block: => T) + (catchBlock: => Unit = (), finallyBlock: => Unit = ()): T = { var originalThrowable: Throwable = null try { block } catch { - case t: Throwable => + case cause: Throwable => // Purposefully not using NonFatal, because even fatal exceptions // we don't want to have our finallyBlock suppress - originalThrowable = t - TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(t) + originalThrowable = cause + try { + logError("Aborting task", originalThrowable) + TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(originalThrowable) + catchBlock + } catch { + case t: Throwable => + originalThrowable.addSuppressed(t) + logWarning(s"Suppressing exception in catch: " + t.getMessage, t) + } throw originalThrowable } finally { try { |