aboutsummaryrefslogtreecommitdiff
path: root/common
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2016-11-14 16:46:26 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2016-11-14 16:46:26 -0800
commitc07187823a98f0d1a0f58c06e28a27e1abed157a (patch)
treee0838b92abc9aa3d271742ab8fbad1e760eb068c /common
parentbd85603ba5f9e61e1aa8326d3e4d5703b5977a4c (diff)
downloadspark-c07187823a98f0d1a0f58c06e28a27e1abed157a.tar.gz
spark-c07187823a98f0d1a0f58c06e28a27e1abed157a.tar.bz2
spark-c07187823a98f0d1a0f58c06e28a27e1abed157a.zip
[SPARK-18124] Observed delay based Event Time Watermarks
This PR adds a new method `withWatermark` to the `Dataset` API, which can be used specify an _event time watermark_. An event time watermark allows the streaming engine to reason about the point in time after which we no longer expect to see late data. This PR also has augmented `StreamExecution` to use this watermark for several purposes: - To know when a given time window aggregation is finalized and thus results can be emitted when using output modes that do not allow updates (e.g. `Append` mode). - To minimize the amount of state that we need to keep for on-going aggregations, by evicting state for groups that are no longer expected to change. Although, we do still maintain all state if the query requires (i.e. if the event time is not present in the `groupBy` or when running in `Complete` mode). An example that emits windowed counts of records, waiting up to 5 minutes for late data to arrive. ```scala df.withWatermark("eventTime", "5 minutes") .groupBy(window($"eventTime", "1 minute") as 'window) .count() .writeStream .format("console") .mode("append") // In append mode, we only output finalized aggregations. .start() ``` ### Calculating the watermark. The current event time is computed by looking at the `MAX(eventTime)` seen this epoch across all of the partitions in the query minus some user defined _delayThreshold_. An additional constraint is that the watermark must increase monotonically. Note that since we must coordinate this value across partitions occasionally, the actual watermark used is only guaranteed to be at least `delay` behind the actual event time. In some cases we may still process records that arrive more than delay late. This mechanism was chosen for the initial implementation over processing time for two reasons: - it is robust to downtime that could affect processing delay - it does not require syncing of time or timezones between the producer and the processing engine. ### Other notable implementation details - A new trigger metric `eventTimeWatermark` outputs the current value of the watermark. - We mark the event time column in the `Attribute` metadata using the key `spark.watermarkDelay`. This allows downstream operations to know which column holds the event time. Operations like `window` propagate this metadata. - `explain()` marks the watermark with a suffix of `-T${delayMs}` to ease debugging of how this information is propagated. - Currently, we don't filter out late records, but instead rely on the state store to avoid emitting records that are both added and filtered in the same epoch. ### Remaining in this PR - [ ] The test for recovery is currently failing as we don't record the watermark used in the offset log. We will need to do so to ensure determinism, but this is deferred until #15626 is merged. ### Other follow-ups There are some natural additional features that we should consider for future work: - Ability to write records that arrive too late to some external store in case any out-of-band remediation is required. - `Update` mode so you can get partial results before a group is evicted. - Other mechanisms for calculating the watermark. In particular a watermark based on quantiles would be more robust to outliers. Author: Michael Armbrust <michael@databricks.com> Closes #15702 from marmbrus/watermarks.
Diffstat (limited to 'common')
-rw-r--r--common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java4
1 files changed, 4 insertions, 0 deletions
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java
index 518ed6470a..a7b0e6f80c 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java
@@ -252,6 +252,10 @@ public final class CalendarInterval implements Serializable {
public final int months;
public final long microseconds;
+ public final long milliseconds() {
+ return this.microseconds / MICROS_PER_MILLI;
+ }
+
public CalendarInterval(int months, long microseconds) {
this.months = months;
this.microseconds = microseconds;