aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-11-15 15:12:30 -0800
committerMichael Armbrust <michael@databricks.com>2016-11-15 15:12:30 -0800
commit1ae4652b7e1f77a984b8459c778cb06c814192c5 (patch)
tree7a9c1d234eba7cf5212fd9d15c85aa03d590817e /streaming/src
parent5bcb9a7ff4bdd7dac75481a951cd7da2133a2e2d (diff)
downloadspark-1ae4652b7e1f77a984b8459c778cb06c814192c5.tar.gz
spark-1ae4652b7e1f77a984b8459c778cb06c814192c5.tar.bz2
spark-1ae4652b7e1f77a984b8459c778cb06c814192c5.zip
[SPARK-18440][STRUCTURED STREAMING] Pass correct query execution to FileFormatWriter
## What changes were proposed in this pull request? SPARK-18012 refactored the file write path in FileStreamSink using FileFormatWriter which always uses the default non-streaming QueryExecution to perform the writes. This is wrong for FileStreamSink, because the streaming QueryExecution (i.e. IncrementalExecution) should be used for correctly incrementalizing aggregation. The addition of watermarks in SPARK-18124, file stream sink should logically supports aggregation + watermark + append mode. But actually it fails with ``` 16:23:07.389 ERROR org.apache.spark.sql.execution.streaming.StreamExecution: Query query-0 terminated with error java.lang.AssertionError: assertion failed: No plan for EventTimeWatermark timestamp#7: timestamp, interval 10 seconds +- LocalRelation [timestamp#7] at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74) ``` This PR fixes it by passing the correct query execution. ## How was this patch tested? New unit test Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #15885 from tdas/SPARK-18440.
Diffstat (limited to 'streaming/src')
0 files changed, 0 insertions, 0 deletions