diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2016-11-15 15:12:30 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2016-11-15 15:12:30 -0800 |
commit | 1ae4652b7e1f77a984b8459c778cb06c814192c5 (patch) | |
tree | 7a9c1d234eba7cf5212fd9d15c85aa03d590817e /streaming/src | |
parent | 5bcb9a7ff4bdd7dac75481a951cd7da2133a2e2d (diff) | |
download | spark-1ae4652b7e1f77a984b8459c778cb06c814192c5.tar.gz spark-1ae4652b7e1f77a984b8459c778cb06c814192c5.tar.bz2 spark-1ae4652b7e1f77a984b8459c778cb06c814192c5.zip |
[SPARK-18440][STRUCTURED STREAMING] Pass correct query execution to FileFormatWriter
## What changes were proposed in this pull request?
SPARK-18012 refactored the file write path in FileStreamSink using FileFormatWriter which always uses the default non-streaming QueryExecution to perform the writes. This is wrong for FileStreamSink, because the streaming QueryExecution (i.e. IncrementalExecution) should be used for correctly incrementalizing aggregation. The addition of watermarks in SPARK-18124, file stream sink should logically supports aggregation + watermark + append mode. But actually it fails with
```
16:23:07.389 ERROR org.apache.spark.sql.execution.streaming.StreamExecution: Query query-0 terminated with error
java.lang.AssertionError: assertion failed: No plan for EventTimeWatermark timestamp#7: timestamp, interval 10 seconds
+- LocalRelation [timestamp#7]
at scala.Predef$.assert(Predef.scala:170)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
```
This PR fixes it by passing the correct query execution.
## How was this patch tested?
New unit test
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #15885 from tdas/SPARK-18440.
Diffstat (limited to 'streaming/src')
0 files changed, 0 insertions, 0 deletions