aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2016-11-14 16:46:26 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2016-11-14 16:46:26 -0800
commitc07187823a98f0d1a0f58c06e28a27e1abed157a (patch)
treee0838b92abc9aa3d271742ab8fbad1e760eb068c /sql/core
parentbd85603ba5f9e61e1aa8326d3e4d5703b5977a4c (diff)
downloadspark-c07187823a98f0d1a0f58c06e28a27e1abed157a.tar.gz
spark-c07187823a98f0d1a0f58c06e28a27e1abed157a.tar.bz2
spark-c07187823a98f0d1a0f58c06e28a27e1abed157a.zip
[SPARK-18124] Observed delay based Event Time Watermarks
This PR adds a new method `withWatermark` to the `Dataset` API, which can be used specify an _event time watermark_. An event time watermark allows the streaming engine to reason about the point in time after which we no longer expect to see late data. This PR also has augmented `StreamExecution` to use this watermark for several purposes: - To know when a given time window aggregation is finalized and thus results can be emitted when using output modes that do not allow updates (e.g. `Append` mode). - To minimize the amount of state that we need to keep for on-going aggregations, by evicting state for groups that are no longer expected to change. Although, we do still maintain all state if the query requires (i.e. if the event time is not present in the `groupBy` or when running in `Complete` mode). An example that emits windowed counts of records, waiting up to 5 minutes for late data to arrive. ```scala df.withWatermark("eventTime", "5 minutes") .groupBy(window($"eventTime", "1 minute") as 'window) .count() .writeStream .format("console") .mode("append") // In append mode, we only output finalized aggregations. .start() ``` ### Calculating the watermark. The current event time is computed by looking at the `MAX(eventTime)` seen this epoch across all of the partitions in the query minus some user defined _delayThreshold_. An additional constraint is that the watermark must increase monotonically. Note that since we must coordinate this value across partitions occasionally, the actual watermark used is only guaranteed to be at least `delay` behind the actual event time. In some cases we may still process records that arrive more than delay late. This mechanism was chosen for the initial implementation over processing time for two reasons: - it is robust to downtime that could affect processing delay - it does not require syncing of time or timezones between the producer and the processing engine. ### Other notable implementation details - A new trigger metric `eventTimeWatermark` outputs the current value of the watermark. - We mark the event time column in the `Attribute` metadata using the key `spark.watermarkDelay`. This allows downstream operations to know which column holds the event time. Operations like `window` propagate this metadata. - `explain()` marks the watermark with a suffix of `-T${delayMs}` to ease debugging of how this information is propagated. - Currently, we don't filter out late records, but instead rely on the state store to avoid emitting records that are both added and filtered in the same epoch. ### Remaining in this PR - [ ] The test for recovery is currently failing as we don't record the watermark used in the offset log. We will need to do so to ensure determinism, but this is deferred until #15626 is merged. ### Other follow-ups There are some natural additional features that we should consider for future work: - Ability to write records that arrive too late to some external store in case any out-of-band remediation is required. - `Update` mode so you can get partial results before a group is evicted. - Other mechanisms for calculating the watermark. In particular a watermark based on quantiles would be more robust to outliers. Author: Michael Armbrust <michael@databricks.com> Closes #15702 from marmbrus/watermarks.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala40
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala93
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala170
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala25
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala23
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala191
14 files changed, 490 insertions, 104 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index eb2b20afc3..af30683cc0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -50,6 +50,7 @@ import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.streaming.DataStreamWriter
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.util.Utils
private[sql] object Dataset {
@@ -476,7 +477,7 @@ class Dataset[T] private[sql](
* `collect()`, will throw an [[AnalysisException]] when there is a streaming
* source present.
*
- * @group basic
+ * @group streaming
* @since 2.0.0
*/
@Experimental
@@ -496,8 +497,6 @@ class Dataset[T] private[sql](
/**
* Returns a checkpointed version of this Dataset.
*
- * @param eager When true, materializes the underlying checkpointed RDD eagerly.
- *
* @group basic
* @since 2.1.0
*/
@@ -536,6 +535,41 @@ class Dataset[T] private[sql](
}
/**
+ * :: Experimental ::
+ * Defines an event time watermark for this [[Dataset]]. A watermark tracks a point in time
+ * before which we assume no more late data is going to arrive.
+ *
+ * Spark will use this watermark for several purposes:
+ * - To know when a given time window aggregation can be finalized and thus can be emitted when
+ * using output modes that do not allow updates.
+ * - To minimize the amount of state that we need to keep for on-going aggregations.
+ *
+ * The current watermark is computed by looking at the `MAX(eventTime)` seen across
+ * all of the partitions in the query minus a user specified `delayThreshold`. Due to the cost
+ * of coordinating this value across partitions, the actual watermark used is only guaranteed
+ * to be at least `delayThreshold` behind the actual event time. In some cases we may still
+ * process records that arrive more than `delayThreshold` late.
+ *
+ * @param eventTime the name of the column that contains the event time of the row.
+ * @param delayThreshold the minimum delay to wait to data to arrive late, relative to the latest
+ * record that has been processed in the form of an interval
+ * (e.g. "1 minute" or "5 hours").
+ *
+ * @group streaming
+ * @since 2.1.0
+ */
+ @Experimental
+ @InterfaceStability.Evolving
+ // We only accept an existing column name, not a derived column here as a watermark that is
+ // defined on a derived column cannot referenced elsewhere in the plan.
+ def withWatermark(eventTime: String, delayThreshold: String): Dataset[T] = withTypedPlan {
+ val parsedDelay =
+ Option(CalendarInterval.fromString("interval " + delayThreshold))
+ .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'"))
+ EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan)
+ }
+
+ /**
* Displays the Dataset in a tabular form. Strings more than 20 characters will be truncated,
* and all cells will be aligned right. For example:
* {{{
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 190fdd8434..2308ae8a6c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -18,20 +18,23 @@
package org.apache.spark.sql.execution
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{execution, SaveMode, Strategy}
+import org.apache.spark.sql.{SaveMode, Strategy}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, EventTimeWatermark, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution
import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight}
-import org.apache.spark.sql.execution.streaming.{MemoryPlan, StreamingExecutionRelation, StreamingRelation, StreamingRelationExec}
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.StreamingQuery
/**
* Converts a logical plan into zero or more SparkPlans. This API is exposed for experimenting
@@ -224,6 +227,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
*/
object StatefulAggregationStrategy extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case EventTimeWatermark(columnName, delay, child) =>
+ EventTimeWatermarkExec(columnName, delay, planLater(child)) :: Nil
+
case PhysicalAggregation(
namedGroupingExpressions, aggregateExpressions, rewrittenResultExpressions, child) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala
index 3c8ef1ad84..8b8ccf4239 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala
@@ -328,8 +328,13 @@ object AggUtils {
}
// Note: stateId and returnAllStates are filled in later with preparation rules
// in IncrementalExecution.
- val saved = StateStoreSaveExec(
- groupingAttributes, stateId = None, returnAllStates = None, partialMerged2)
+ val saved =
+ StateStoreSaveExec(
+ groupingAttributes,
+ stateId = None,
+ outputMode = None,
+ eventTimeWatermark = None,
+ partialMerged2)
val finalAndCompleteAggregate: SparkPlan = {
val finalAggregateExpressions = functionsWithoutDistinct.map(_.copy(mode = Final))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index d82e54e575..52d8dc22a2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -104,7 +104,7 @@ case class ExplainCommand(
if (logicalPlan.isStreaming) {
// This is used only by explaining `Dataset/DataFrame` created by `spark.readStream`, so the
// output mode does not matter since there is no `Sink`.
- new IncrementalExecution(sparkSession, logicalPlan, OutputMode.Append(), "<unknown>", 0)
+ new IncrementalExecution(sparkSession, logicalPlan, OutputMode.Append(), "<unknown>", 0, 0)
} else {
sparkSession.sessionState.executePlan(logicalPlan)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
new file mode 100644
index 0000000000..4c8cb069d2
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.math.max
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types.MetadataBuilder
+import org.apache.spark.unsafe.types.CalendarInterval
+import org.apache.spark.util.AccumulatorV2
+
+/** Tracks the maximum positive long seen. */
+class MaxLong(protected var currentValue: Long = 0)
+ extends AccumulatorV2[Long, Long] {
+
+ override def isZero: Boolean = value == 0
+ override def value: Long = currentValue
+ override def copy(): AccumulatorV2[Long, Long] = new MaxLong(currentValue)
+
+ override def reset(): Unit = {
+ currentValue = 0
+ }
+
+ override def add(v: Long): Unit = {
+ currentValue = max(v, value)
+ }
+
+ override def merge(other: AccumulatorV2[Long, Long]): Unit = {
+ currentValue = max(value, other.value)
+ }
+}
+
+/**
+ * Used to mark a column as the containing the event time for a given record. In addition to
+ * adding appropriate metadata to this column, this operator also tracks the maximum observed event
+ * time. Based on the maximum observed time and a user specified delay, we can calculate the
+ * `watermark` after which we assume we will no longer see late records for a particular time
+ * period.
+ */
+case class EventTimeWatermarkExec(
+ eventTime: Attribute,
+ delay: CalendarInterval,
+ child: SparkPlan) extends SparkPlan {
+
+ // TODO: Use Spark SQL Metrics?
+ val maxEventTime = new MaxLong
+ sparkContext.register(maxEventTime)
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ child.execute().mapPartitions { iter =>
+ val getEventTime = UnsafeProjection.create(eventTime :: Nil, child.output)
+ iter.map { row =>
+ maxEventTime.add(getEventTime(row).getLong(0))
+ row
+ }
+ }
+ }
+
+ // Update the metadata on the eventTime column to include the desired delay.
+ override val output: Seq[Attribute] = child.output.map { a =>
+ if (a semanticEquals eventTime) {
+ val updatedMetadata = new MetadataBuilder()
+ .withMetadata(a.metadata)
+ .putLong(EventTimeWatermark.delayKey, delay.milliseconds)
+ .build()
+
+ a.withMetadata(updatedMetadata)
+ } else {
+ a
+ }
+ }
+
+ override def children: Seq[SparkPlan] = child :: Nil
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
index 24f98b9211..f5c550dd6a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
@@ -60,7 +60,8 @@ class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Seria
deserialized,
data.queryExecution.asInstanceOf[IncrementalExecution].outputMode,
data.queryExecution.asInstanceOf[IncrementalExecution].checkpointLocation,
- data.queryExecution.asInstanceOf[IncrementalExecution].currentBatchId)
+ data.queryExecution.asInstanceOf[IncrementalExecution].currentBatchId,
+ data.queryExecution.asInstanceOf[IncrementalExecution].currentEventTimeWatermark)
incrementalExecution.toRdd.mapPartitions { rows =>
rows.map(_.get(0, objectType))
}.asInstanceOf[RDD[T]]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index 05294df267..e9d072f8a9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -32,11 +32,13 @@ class IncrementalExecution(
logicalPlan: LogicalPlan,
val outputMode: OutputMode,
val checkpointLocation: String,
- val currentBatchId: Long)
+ val currentBatchId: Long,
+ val currentEventTimeWatermark: Long)
extends QueryExecution(sparkSession, logicalPlan) {
// TODO: make this always part of planning.
- val stateStrategy = sparkSession.sessionState.planner.StatefulAggregationStrategy +:
+ val stateStrategy =
+ sparkSession.sessionState.planner.StatefulAggregationStrategy +:
sparkSession.sessionState.planner.StreamingRelationStrategy +:
sparkSession.sessionState.experimentalMethods.extraStrategies
@@ -57,17 +59,17 @@ class IncrementalExecution(
val state = new Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = plan transform {
- case StateStoreSaveExec(keys, None, None,
+ case StateStoreSaveExec(keys, None, None, None,
UnaryExecNode(agg,
StateStoreRestoreExec(keys2, None, child))) =>
val stateId = OperatorStateId(checkpointLocation, operatorId, currentBatchId)
- val returnAllStates = if (outputMode == InternalOutputModes.Complete) true else false
operatorId += 1
StateStoreSaveExec(
keys,
Some(stateId),
- Some(returnAllStates),
+ Some(outputMode),
+ Some(currentEventTimeWatermark),
agg.withNewChildren(
StateStoreRestoreExec(
keys,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
index ad8238f189..7af978a9c4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
@@ -21,12 +21,17 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratePredicate, GenerateUnsafeProjection}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution
+import org.apache.spark.sql.InternalOutputModes._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.execution.streaming.state._
import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.StructType
+
/** Used to identify the state store for a given operator. */
case class OperatorStateId(
@@ -92,8 +97,9 @@ case class StateStoreRestoreExec(
*/
case class StateStoreSaveExec(
keyExpressions: Seq[Attribute],
- stateId: Option[OperatorStateId],
- returnAllStates: Option[Boolean],
+ stateId: Option[OperatorStateId] = None,
+ outputMode: Option[OutputMode] = None,
+ eventTimeWatermark: Option[Long] = None,
child: SparkPlan)
extends execution.UnaryExecNode with StatefulOperator {
@@ -104,9 +110,9 @@ case class StateStoreSaveExec(
override protected def doExecute(): RDD[InternalRow] = {
metrics // force lazy init at driver
- assert(returnAllStates.nonEmpty,
- "Incorrect planning in IncrementalExecution, returnAllStates have not been set")
- val saveAndReturnFunc = if (returnAllStates.get) saveAndReturnAll _ else saveAndReturnUpdated _
+ assert(outputMode.nonEmpty,
+ "Incorrect planning in IncrementalExecution, outputMode has not been set")
+
child.execute().mapPartitionsWithStateStore(
getStateId.checkpointLocation,
operatorId = getStateId.operatorId,
@@ -114,75 +120,95 @@ case class StateStoreSaveExec(
keyExpressions.toStructType,
child.output.toStructType,
sqlContext.sessionState,
- Some(sqlContext.streams.stateStoreCoordinator)
- )(saveAndReturnFunc)
+ Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
+ val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
+ val numOutputRows = longMetric("numOutputRows")
+ val numTotalStateRows = longMetric("numTotalStateRows")
+ val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+
+ outputMode match {
+ // Update and output all rows in the StateStore.
+ case Some(Complete) =>
+ while (iter.hasNext) {
+ val row = iter.next().asInstanceOf[UnsafeRow]
+ val key = getKey(row)
+ store.put(key.copy(), row.copy())
+ numUpdatedStateRows += 1
+ }
+ store.commit()
+ numTotalStateRows += store.numKeys()
+ store.iterator().map { case (k, v) =>
+ numOutputRows += 1
+ v.asInstanceOf[InternalRow]
+ }
+
+ // Update and output only rows being evicted from the StateStore
+ case Some(Append) =>
+ while (iter.hasNext) {
+ val row = iter.next().asInstanceOf[UnsafeRow]
+ val key = getKey(row)
+ store.put(key.copy(), row.copy())
+ numUpdatedStateRows += 1
+ }
+
+ val watermarkAttribute =
+ keyExpressions.find(_.metadata.contains(EventTimeWatermark.delayKey)).get
+ // If we are evicting based on a window, use the end of the window. Otherwise just
+ // use the attribute itself.
+ val evictionExpression =
+ if (watermarkAttribute.dataType.isInstanceOf[StructType]) {
+ LessThanOrEqual(
+ GetStructField(watermarkAttribute, 1),
+ Literal(eventTimeWatermark.get * 1000))
+ } else {
+ LessThanOrEqual(
+ watermarkAttribute,
+ Literal(eventTimeWatermark.get * 1000))
+ }
+
+ logInfo(s"Filtering state store on: $evictionExpression")
+ val predicate = newPredicate(evictionExpression, keyExpressions)
+ store.remove(predicate.eval)
+
+ store.commit()
+
+ numTotalStateRows += store.numKeys()
+ store.updates().filter(_.isInstanceOf[ValueRemoved]).map { removed =>
+ numOutputRows += 1
+ removed.value.asInstanceOf[InternalRow]
+ }
+
+ // Update and output modified rows from the StateStore.
+ case Some(Update) =>
+ new Iterator[InternalRow] {
+ private[this] val baseIterator = iter
+
+ override def hasNext: Boolean = {
+ if (!baseIterator.hasNext) {
+ store.commit()
+ numTotalStateRows += store.numKeys()
+ false
+ } else {
+ true
+ }
+ }
+
+ override def next(): InternalRow = {
+ val row = baseIterator.next().asInstanceOf[UnsafeRow]
+ val key = getKey(row)
+ store.put(key.copy(), row.copy())
+ numOutputRows += 1
+ numUpdatedStateRows += 1
+ row
+ }
+ }
+
+ case _ => throw new UnsupportedOperationException(s"Invalid output mode: $outputMode")
+ }
+ }
}
override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = child.outputPartitioning
-
- /**
- * Save all the rows to the state store, and return all the rows in the state store.
- * Note that this returns an iterator that pipelines the saving to store with downstream
- * processing.
- */
- private def saveAndReturnUpdated(
- store: StateStore,
- iter: Iterator[InternalRow]): Iterator[InternalRow] = {
- val numOutputRows = longMetric("numOutputRows")
- val numTotalStateRows = longMetric("numTotalStateRows")
- val numUpdatedStateRows = longMetric("numUpdatedStateRows")
-
- new Iterator[InternalRow] {
- private[this] val baseIterator = iter
- private[this] val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
-
- override def hasNext: Boolean = {
- if (!baseIterator.hasNext) {
- store.commit()
- numTotalStateRows += store.numKeys()
- false
- } else {
- true
- }
- }
-
- override def next(): InternalRow = {
- val row = baseIterator.next().asInstanceOf[UnsafeRow]
- val key = getKey(row)
- store.put(key.copy(), row.copy())
- numOutputRows += 1
- numUpdatedStateRows += 1
- row
- }
- }
- }
-
- /**
- * Save all the rows to the state store, and return all the rows in the state store.
- * Note that the saving to store is blocking; only after all the rows have been saved
- * is the iterator on the update store data is generated.
- */
- private def saveAndReturnAll(
- store: StateStore,
- iter: Iterator[InternalRow]): Iterator[InternalRow] = {
- val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
- val numOutputRows = longMetric("numOutputRows")
- val numTotalStateRows = longMetric("numTotalStateRows")
- val numUpdatedStateRows = longMetric("numUpdatedStateRows")
-
- while (iter.hasNext) {
- val row = iter.next().asInstanceOf[UnsafeRow]
- val key = getKey(row)
- store.put(key.copy(), row.copy())
- numUpdatedStateRows += 1
- }
- store.commit()
- numTotalStateRows += store.numKeys()
- store.iterator().map { case (k, v) =>
- numOutputRows += 1
- v.asInstanceOf[InternalRow]
- }
- }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 57e89f8536..3ca6feac05 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -92,6 +92,9 @@ class StreamExecution(
/** The current batchId or -1 if execution has not yet been initialized. */
private var currentBatchId: Long = -1
+ /** The current eventTime watermark, used to bound the lateness of data that will processed. */
+ private var currentEventTimeWatermark: Long = 0
+
/** All stream sources present in the query plan. */
private val sources =
logicalPlan.collect { case s: StreamingExecutionRelation => s.source }
@@ -427,7 +430,8 @@ class StreamExecution(
triggerLogicalPlan,
outputMode,
checkpointFile("state"),
- currentBatchId)
+ currentBatchId,
+ currentEventTimeWatermark)
lastExecution.executedPlan // Force the lazy generation of execution plan
}
@@ -436,6 +440,25 @@ class StreamExecution(
sink.addBatch(currentBatchId, nextBatch)
reportNumRows(executedPlan, triggerLogicalPlan, newData)
+ // Update the eventTime watermark if we find one in the plan.
+ // TODO: Does this need to be an AttributeMap?
+ lastExecution.executedPlan.collect {
+ case e: EventTimeWatermarkExec =>
+ logTrace(s"Maximum observed eventTime: ${e.maxEventTime.value}")
+ (e.maxEventTime.value / 1000) - e.delay.milliseconds()
+ }.headOption.foreach { newWatermark =>
+ if (newWatermark > currentEventTimeWatermark) {
+ logInfo(s"Updating eventTime watermark to: $newWatermark ms")
+ currentEventTimeWatermark = newWatermark
+ } else {
+ logTrace(s"Event time didn't move: $newWatermark < $currentEventTimeWatermark")
+ }
+
+ if (newWatermark != 0) {
+ streamMetrics.reportTriggerDetail(EVENT_TIME_WATERMARK, newWatermark)
+ }
+ }
+
awaitBatchLock.lock()
try {
// Wake up any threads that are waiting for the stream to progress.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
index e98d1883e4..5645554a58 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
@@ -221,6 +221,7 @@ object StreamMetrics extends Logging {
val IS_TRIGGER_ACTIVE = "isTriggerActive"
val IS_DATA_PRESENT_IN_TRIGGER = "isDataPresentInTrigger"
val STATUS_MESSAGE = "statusMessage"
+ val EVENT_TIME_WATERMARK = "eventTimeWatermark"
val START_TIMESTAMP = "timestamp.triggerStart"
val GET_OFFSET_TIMESTAMP = "timestamp.afterGetOffset"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index f07feaad5d..493fdaaec5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -109,7 +109,7 @@ private[state] class HDFSBackedStateStoreProvider(
case Some(ValueAdded(_, _)) =>
// Value did not exist in previous version and was added already, keep it marked as added
allUpdates.put(key, ValueAdded(key, value))
- case Some(ValueUpdated(_, _)) | Some(KeyRemoved(_)) =>
+ case Some(ValueUpdated(_, _)) | Some(ValueRemoved(_, _)) =>
// Value existed in previous version and updated/removed, mark it as updated
allUpdates.put(key, ValueUpdated(key, value))
case None =>
@@ -124,24 +124,25 @@ private[state] class HDFSBackedStateStoreProvider(
/** Remove keys that match the following condition */
override def remove(condition: UnsafeRow => Boolean): Unit = {
verify(state == UPDATING, "Cannot remove after already committed or aborted")
-
- val keyIter = mapToUpdate.keySet().iterator()
- while (keyIter.hasNext) {
- val key = keyIter.next
- if (condition(key)) {
- keyIter.remove()
+ val entryIter = mapToUpdate.entrySet().iterator()
+ while (entryIter.hasNext) {
+ val entry = entryIter.next
+ if (condition(entry.getKey)) {
+ val value = entry.getValue
+ val key = entry.getKey
+ entryIter.remove()
Option(allUpdates.get(key)) match {
case Some(ValueUpdated(_, _)) | None =>
// Value existed in previous version and maybe was updated, mark removed
- allUpdates.put(key, KeyRemoved(key))
+ allUpdates.put(key, ValueRemoved(key, value))
case Some(ValueAdded(_, _)) =>
// Value did not exist in previous version and was added, should not appear in updates
allUpdates.remove(key)
- case Some(KeyRemoved(_)) =>
+ case Some(ValueRemoved(_, _)) =>
// Remove already in update map, no need to change
}
- writeToDeltaFile(tempDeltaFileStream, KeyRemoved(key))
+ writeToDeltaFile(tempDeltaFileStream, ValueRemoved(key, value))
}
}
}
@@ -334,7 +335,7 @@ private[state] class HDFSBackedStateStoreProvider(
writeUpdate(key, value)
case ValueUpdated(key, value) =>
writeUpdate(key, value)
- case KeyRemoved(key) =>
+ case ValueRemoved(key, value) =>
writeRemove(key)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 7132e284c2..9bc6c0e2b9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -99,13 +99,16 @@ trait StateStoreProvider {
/** Trait representing updates made to a [[StateStore]]. */
-sealed trait StoreUpdate
+sealed trait StoreUpdate {
+ def key: UnsafeRow
+ def value: UnsafeRow
+}
case class ValueAdded(key: UnsafeRow, value: UnsafeRow) extends StoreUpdate
case class ValueUpdated(key: UnsafeRow, value: UnsafeRow) extends StoreUpdate
-case class KeyRemoved(key: UnsafeRow) extends StoreUpdate
+case class ValueRemoved(key: UnsafeRow, value: UnsafeRow) extends StoreUpdate
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 533cd0cd2a..05fc7345a7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -668,11 +668,11 @@ private[state] object StateStoreSuite {
}
def updatesToSet(iterator: Iterator[StoreUpdate]): Set[TestUpdate] = {
- iterator.map { _ match {
+ iterator.map {
case ValueAdded(key, value) => Added(rowToString(key), rowToInt(value))
case ValueUpdated(key, value) => Updated(rowToString(key), rowToInt(value))
- case KeyRemoved(key) => Removed(rowToString(key))
- }}.toSet
+ case ValueRemoved(key, _) => Removed(rowToString(key))
+ }.toSet
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
new file mode 100644
index 0000000000..3617ec0f56
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.functions.{count, window}
+
+class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
+
+ import testImplicits._
+
+ after {
+ sqlContext.streams.active.foreach(_.stop())
+ }
+
+ test("error on bad column") {
+ val inputData = MemoryStream[Int].toDF()
+ val e = intercept[AnalysisException] {
+ inputData.withWatermark("badColumn", "1 minute")
+ }
+ assert(e.getMessage contains "badColumn")
+ }
+
+ test("error on wrong type") {
+ val inputData = MemoryStream[Int].toDF()
+ val e = intercept[AnalysisException] {
+ inputData.withWatermark("value", "1 minute")
+ }
+ assert(e.getMessage contains "value")
+ assert(e.getMessage contains "int")
+ }
+
+
+ test("watermark metric") {
+ val inputData = MemoryStream[Int]
+
+ val windowedAggregation = inputData.toDF()
+ .withColumn("eventTime", $"value".cast("timestamp"))
+ .withWatermark("eventTime", "10 seconds")
+ .groupBy(window($"eventTime", "5 seconds") as 'window)
+ .agg(count("*") as 'count)
+ .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+ testStream(windowedAggregation)(
+ AddData(inputData, 15),
+ AssertOnLastQueryStatus { status =>
+ status.triggerDetails.get(StreamMetrics.EVENT_TIME_WATERMARK) === "5000"
+ },
+ AddData(inputData, 15),
+ AssertOnLastQueryStatus { status =>
+ status.triggerDetails.get(StreamMetrics.EVENT_TIME_WATERMARK) === "5000"
+ },
+ AddData(inputData, 25),
+ AssertOnLastQueryStatus { status =>
+ status.triggerDetails.get(StreamMetrics.EVENT_TIME_WATERMARK) === "15000"
+ }
+ )
+ }
+
+ test("append-mode watermark aggregation") {
+ val inputData = MemoryStream[Int]
+
+ val windowedAggregation = inputData.toDF()
+ .withColumn("eventTime", $"value".cast("timestamp"))
+ .withWatermark("eventTime", "10 seconds")
+ .groupBy(window($"eventTime", "5 seconds") as 'window)
+ .agg(count("*") as 'count)
+ .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+ testStream(windowedAggregation)(
+ AddData(inputData, 10, 11, 12, 13, 14, 15),
+ CheckAnswer(),
+ AddData(inputData, 25), // Advance watermark to 15 seconds
+ CheckAnswer(),
+ AddData(inputData, 25), // Evict items less than previous watermark.
+ CheckAnswer((10, 5))
+ )
+ }
+
+ ignore("recovery") {
+ val inputData = MemoryStream[Int]
+
+ val windowedAggregation = inputData.toDF()
+ .withColumn("eventTime", $"value".cast("timestamp"))
+ .withWatermark("eventTime", "10 seconds")
+ .groupBy(window($"eventTime", "5 seconds") as 'window)
+ .agg(count("*") as 'count)
+ .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+ testStream(windowedAggregation)(
+ AddData(inputData, 10, 11, 12, 13, 14, 15),
+ CheckAnswer(),
+ AddData(inputData, 25), // Advance watermark to 15 seconds
+ StopStream,
+ StartStream(),
+ CheckAnswer(),
+ AddData(inputData, 25), // Evict items less than previous watermark.
+ StopStream,
+ StartStream(),
+ CheckAnswer((10, 5))
+ )
+ }
+
+ test("dropping old data") {
+ val inputData = MemoryStream[Int]
+
+ val windowedAggregation = inputData.toDF()
+ .withColumn("eventTime", $"value".cast("timestamp"))
+ .withWatermark("eventTime", "10 seconds")
+ .groupBy(window($"eventTime", "5 seconds") as 'window)
+ .agg(count("*") as 'count)
+ .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+ testStream(windowedAggregation)(
+ AddData(inputData, 10, 11, 12),
+ CheckAnswer(),
+ AddData(inputData, 25), // Advance watermark to 15 seconds
+ CheckAnswer(),
+ AddData(inputData, 25), // Evict items less than previous watermark.
+ CheckAnswer((10, 3)),
+ AddData(inputData, 10), // 10 is later than 15 second watermark
+ CheckAnswer((10, 3)),
+ AddData(inputData, 25),
+ CheckAnswer((10, 3)) // Should not emit an incorrect partial result.
+ )
+ }
+
+ test("complete mode") {
+ val inputData = MemoryStream[Int]
+
+ val windowedAggregation = inputData.toDF()
+ .withColumn("eventTime", $"value".cast("timestamp"))
+ .withWatermark("eventTime", "10 seconds")
+ .groupBy(window($"eventTime", "5 seconds") as 'window)
+ .agg(count("*") as 'count)
+ .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+ // No eviction when asked to compute complete results.
+ testStream(windowedAggregation, OutputMode.Complete)(
+ AddData(inputData, 10, 11, 12),
+ CheckAnswer((10, 3)),
+ AddData(inputData, 25),
+ CheckAnswer((10, 3), (25, 1)),
+ AddData(inputData, 25),
+ CheckAnswer((10, 3), (25, 2)),
+ AddData(inputData, 10),
+ CheckAnswer((10, 4), (25, 2)),
+ AddData(inputData, 25),
+ CheckAnswer((10, 4), (25, 3))
+ )
+ }
+
+ test("group by on raw timestamp") {
+ val inputData = MemoryStream[Int]
+
+ val windowedAggregation = inputData.toDF()
+ .withColumn("eventTime", $"value".cast("timestamp"))
+ .withWatermark("eventTime", "10 seconds")
+ .groupBy($"eventTime")
+ .agg(count("*") as 'count)
+ .select($"eventTime".cast("long").as[Long], $"count".as[Long])
+
+ testStream(windowedAggregation)(
+ AddData(inputData, 10),
+ CheckAnswer(),
+ AddData(inputData, 25), // Advance watermark to 15 seconds
+ CheckAnswer(),
+ AddData(inputData, 25), // Evict items less than previous watermark.
+ CheckAnswer((10, 1))
+ )
+ }
+}