diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-10-28 20:14:38 -0700 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-10-28 20:14:38 -0700 |
commit | 59cccbda489f25add3e10997e950de7e88704aa7 (patch) | |
tree | 4e8a8225a2129910f47f990ca858856a4a3ce374 /sql/core/src/main | |
parent | ac26e9cf27862fbfb97ae18d591606ecf2cd41cf (diff) | |
download | spark-59cccbda489f25add3e10997e950de7e88704aa7.tar.gz spark-59cccbda489f25add3e10997e950de7e88704aa7.tar.bz2 spark-59cccbda489f25add3e10997e950de7e88704aa7.zip |
[SPARK-18164][SQL] ForeachSink should fail the Spark job if `process` throws exception
## What changes were proposed in this pull request?
Fixed the issue that ForeachSink didn't rethrow the exception.
## How was this patch tested?
The fixed unit test.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #15674 from zsxwing/foreach-sink-error.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala | 7 |
1 files changed, 2 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala index 082664aa23..24f98b9211 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala @@ -68,19 +68,16 @@ class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Seria } datasetWithIncrementalExecution.foreachPartition { iter => if (writer.open(TaskContext.getPartitionId(), batchId)) { - var isFailed = false try { while (iter.hasNext) { writer.process(iter.next()) } } catch { case e: Throwable => - isFailed = true writer.close(e) + throw e } - if (!isFailed) { - writer.close(null) - } + writer.close(null) } else { writer.close(null) } |