diff options
author | Holden Karau <holden@pigscanfly.ca> | 2015-06-23 09:08:11 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2015-06-23 09:08:11 -0700 |
commit | 0f92be5b5f017b593bd29d4da7e89aad2b3adac2 (patch) | |
tree | 8a45697a1ff522cb60ff92781b5ad29a45262607 /core/src | |
parent | 6ceb169608428a651d53c93bf73ca5ac53a6bde2 (diff) | |
download | spark-0f92be5b5f017b593bd29d4da7e89aad2b3adac2.tar.gz spark-0f92be5b5f017b593bd29d4da7e89aad2b3adac2.tar.bz2 spark-0f92be5b5f017b593bd29d4da7e89aad2b3adac2.zip |
[SPARK-8498] [TUNGSTEN] fix npe in errorhandling path in unsafeshuffle writer
Author: Holden Karau <holden@pigscanfly.ca>
Closes #6918 from holdenk/SPARK-8498-fix-npe-in-errorhandling-path-in-unsafeshuffle-writer and squashes the following commits:
f807832 [Holden Karau] Log error if we can't throw it
855f9aa [Holden Karau] Spelling - not my strongest suite. Fix Propegates to Propagates.
039d620 [Holden Karau] Add missing closeandwriteoutput
30e558d [Holden Karau] go back to try/finally
e503b8c [Holden Karau] Improve the test to ensure we aren't masking the underlying exception
ae0b7a7 [Holden Karau] Fix the test
2e6abf7 [Holden Karau] Be more cautious when cleaning up during failed write and re-throw user exceptions
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java | 18 | ||||
-rw-r--r-- | core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java | 17 |
2 files changed, 33 insertions, 2 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java index ad7eb04afc..764578b181 100644 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java @@ -139,6 +139,9 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { @Override public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException { + // Keep track of success so we know if we ecountered an exception + // We do this rather than a standard try/catch/re-throw to handle + // generic throwables. boolean success = false; try { while (records.hasNext()) { @@ -147,8 +150,19 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { closeAndWriteOutput(); success = true; } finally { - if (!success) { - sorter.cleanupAfterError(); + if (sorter != null) { + try { + sorter.cleanupAfterError(); + } catch (Exception e) { + // Only throw this error if we won't be masking another + // error. + if (success) { + throw e; + } else { + logger.error("In addition to a failure during writing, we failed during " + + "cleanup.", e); + } + } } } } diff --git a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java index 83d109115a..10c3eedbf4 100644 --- a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java @@ -253,6 +253,23 @@ public class UnsafeShuffleWriterSuite { createWriter(false).stop(false); } + class PandaException extends RuntimeException { + } + + @Test(expected=PandaException.class) + public void writeFailurePropagates() throws Exception { + class BadRecords extends scala.collection.AbstractIterator<Product2<Object, Object>> { + @Override public boolean hasNext() { + throw new PandaException(); + } + @Override public Product2<Object, Object> next() { + return null; + } + } + final UnsafeShuffleWriter<Object, Object> writer = createWriter(true); + writer.write(new BadRecords()); + } + @Test public void writeEmptyIterator() throws Exception { final UnsafeShuffleWriter<Object, Object> writer = createWriter(true); |