aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-10-28 20:14:38 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-10-28 20:14:38 -0700
commit59cccbda489f25add3e10997e950de7e88704aa7 (patch)
tree4e8a8225a2129910f47f990ca858856a4a3ce374 /sql/core/src/main
parentac26e9cf27862fbfb97ae18d591606ecf2cd41cf (diff)
downloadspark-59cccbda489f25add3e10997e950de7e88704aa7.tar.gz
spark-59cccbda489f25add3e10997e950de7e88704aa7.tar.bz2
spark-59cccbda489f25add3e10997e950de7e88704aa7.zip
[SPARK-18164][SQL] ForeachSink should fail the Spark job if `process` throws exception
## What changes were proposed in this pull request? Fixed the issue that ForeachSink didn't rethrow the exception. ## How was this patch tested? The fixed unit test. Author: Shixiong Zhu <shixiong@databricks.com> Closes #15674 from zsxwing/foreach-sink-error.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala7
1 files changed, 2 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
index 082664aa23..24f98b9211 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
@@ -68,19 +68,16 @@ class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Seria
}
datasetWithIncrementalExecution.foreachPartition { iter =>
if (writer.open(TaskContext.getPartitionId(), batchId)) {
- var isFailed = false
try {
while (iter.hasNext) {
writer.process(iter.next())
}
} catch {
case e: Throwable =>
- isFailed = true
writer.close(e)
+ throw e
}
- if (!isFailed) {
- writer.close(null)
- }
+ writer.close(null)
} else {
writer.close(null)
}