aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHolden Karau <holden@pigscanfly.ca>2015-06-23 09:08:11 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-06-23 09:08:11 -0700
commit0f92be5b5f017b593bd29d4da7e89aad2b3adac2 (patch)
tree8a45697a1ff522cb60ff92781b5ad29a45262607
parent6ceb169608428a651d53c93bf73ca5ac53a6bde2 (diff)
downloadspark-0f92be5b5f017b593bd29d4da7e89aad2b3adac2.tar.gz
spark-0f92be5b5f017b593bd29d4da7e89aad2b3adac2.tar.bz2
spark-0f92be5b5f017b593bd29d4da7e89aad2b3adac2.zip
[SPARK-8498] [TUNGSTEN] fix npe in errorhandling path in unsafeshuffle writer
Author: Holden Karau <holden@pigscanfly.ca> Closes #6918 from holdenk/SPARK-8498-fix-npe-in-errorhandling-path-in-unsafeshuffle-writer and squashes the following commits: f807832 [Holden Karau] Log error if we can't throw it 855f9aa [Holden Karau] Spelling - not my strongest suite. Fix Propegates to Propagates. 039d620 [Holden Karau] Add missing closeandwriteoutput 30e558d [Holden Karau] go back to try/finally e503b8c [Holden Karau] Improve the test to ensure we aren't masking the underlying exception ae0b7a7 [Holden Karau] Fix the test 2e6abf7 [Holden Karau] Be more cautious when cleaning up during failed write and re-throw user exceptions
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java18
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java17
2 files changed, 33 insertions, 2 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java
index ad7eb04afc..764578b181 100644
--- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java
@@ -139,6 +139,9 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
@Override
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
+ // Keep track of success so we know if we ecountered an exception
+ // We do this rather than a standard try/catch/re-throw to handle
+ // generic throwables.
boolean success = false;
try {
while (records.hasNext()) {
@@ -147,8 +150,19 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
closeAndWriteOutput();
success = true;
} finally {
- if (!success) {
- sorter.cleanupAfterError();
+ if (sorter != null) {
+ try {
+ sorter.cleanupAfterError();
+ } catch (Exception e) {
+ // Only throw this error if we won't be masking another
+ // error.
+ if (success) {
+ throw e;
+ } else {
+ logger.error("In addition to a failure during writing, we failed during " +
+ "cleanup.", e);
+ }
+ }
}
}
}
diff --git a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
index 83d109115a..10c3eedbf4 100644
--- a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
@@ -253,6 +253,23 @@ public class UnsafeShuffleWriterSuite {
createWriter(false).stop(false);
}
+ class PandaException extends RuntimeException {
+ }
+
+ @Test(expected=PandaException.class)
+ public void writeFailurePropagates() throws Exception {
+ class BadRecords extends scala.collection.AbstractIterator<Product2<Object, Object>> {
+ @Override public boolean hasNext() {
+ throw new PandaException();
+ }
+ @Override public Product2<Object, Object> next() {
+ return null;
+ }
+ }
+ final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
+ writer.write(new BadRecords());
+ }
+
@Test
public void writeEmptyIterator() throws Exception {
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);