aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2017-03-21 21:27:08 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2017-03-21 21:27:08 -0700
commitc1e87e384d1878308b42da80bb3d65be512aab55 (patch)
treeec8dd098b4e3daffa0cddfc786d9eb45f0ce05e6 /sql/core/src/main/scala/org/apache
parent2d73fcced0492c606feab8fe84f62e8318ebcaa1 (diff)
downloadspark-c1e87e384d1878308b42da80bb3d65be512aab55.tar.gz
spark-c1e87e384d1878308b42da80bb3d65be512aab55.tar.bz2
spark-c1e87e384d1878308b42da80bb3d65be512aab55.zip
[SPARK-20030][SS] Event-time-based timeout for MapGroupsWithState
## What changes were proposed in this pull request? Adding event time based timeout. The user sets the timeout timestamp directly using `KeyedState.setTimeoutTimestamp`. The keys times out when the watermark crosses the timeout timestamp. ## How was this patch tested? Unit tests Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #17361 from tdas/SPARK-20030.
Diffstat (limited to 'sql/core/src/main/scala/org/apache')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala87
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/KeyedStateImpl.scala139
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/KeyedState.scala97
6 files changed, 255 insertions, 90 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 9e58e8ce3d..ca2f6dd7a8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -336,8 +336,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
timeout, child) =>
val execPlan = FlatMapGroupsWithStateExec(
func, keyDeser, valueDeser, groupAttr, dataAttr, outputAttr, None, stateEnc, outputMode,
- timeout, batchTimestampMs = KeyedStateImpl.NO_BATCH_PROCESSING_TIMESTAMP,
- planLater(child))
+ timeout, batchTimestampMs = None, eventTimeWatermark = None, planLater(child))
execPlan :: Nil
case _ =>
Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
index 991d8ef707..52ad70c7dc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
@@ -19,13 +19,14 @@ package org.apache.spark.sql.execution.streaming
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeReference, Expression, Literal, SortOrder, SpecificInternalRow, UnsafeProjection, UnsafeRow}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalKeyedState, ProcessingTimeTimeout}
-import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeReference, Expression, Literal, SortOrder, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution}
import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.streaming.KeyedStateImpl.NO_TIMESTAMP
import org.apache.spark.sql.execution.streaming.state._
import org.apache.spark.sql.streaming.{KeyedStateTimeout, OutputMode}
-import org.apache.spark.sql.types.{BooleanType, IntegerType}
+import org.apache.spark.sql.types.IntegerType
import org.apache.spark.util.CompletionIterator
/**
@@ -39,7 +40,7 @@ import org.apache.spark.util.CompletionIterator
* @param outputObjAttr used to define the output object
* @param stateEncoder used to serialize/deserialize state before calling `func`
* @param outputMode the output mode of `func`
- * @param timeout used to timeout groups that have not received data in a while
+ * @param timeoutConf used to timeout groups that have not received data in a while
* @param batchTimestampMs processing timestamp of the current batch.
*/
case class FlatMapGroupsWithStateExec(
@@ -52,11 +53,15 @@ case class FlatMapGroupsWithStateExec(
stateId: Option[OperatorStateId],
stateEncoder: ExpressionEncoder[Any],
outputMode: OutputMode,
- timeout: KeyedStateTimeout,
- batchTimestampMs: Long,
- child: SparkPlan) extends UnaryExecNode with ObjectProducerExec with StateStoreWriter {
+ timeoutConf: KeyedStateTimeout,
+ batchTimestampMs: Option[Long],
+ override val eventTimeWatermark: Option[Long],
+ child: SparkPlan
+ ) extends UnaryExecNode with ObjectProducerExec with StateStoreWriter with WatermarkSupport {
- private val isTimeoutEnabled = timeout == ProcessingTimeTimeout
+ import KeyedStateImpl._
+
+ private val isTimeoutEnabled = timeoutConf != NoTimeout
private val timestampTimeoutAttribute =
AttributeReference("timeoutTimestamp", dataType = IntegerType, nullable = false)()
private val stateAttributes: Seq[Attribute] = {
@@ -64,8 +69,6 @@ case class FlatMapGroupsWithStateExec(
if (isTimeoutEnabled) encSchemaAttribs :+ timestampTimeoutAttribute else encSchemaAttribs
}
- import KeyedStateImpl._
-
/** Distribute by grouping attributes */
override def requiredChildDistribution: Seq[Distribution] =
ClusteredDistribution(groupingAttributes) :: Nil
@@ -74,9 +77,21 @@ case class FlatMapGroupsWithStateExec(
override def requiredChildOrdering: Seq[Seq[SortOrder]] =
Seq(groupingAttributes.map(SortOrder(_, Ascending)))
+ override def keyExpressions: Seq[Attribute] = groupingAttributes
+
override protected def doExecute(): RDD[InternalRow] = {
metrics // force lazy init at driver
+ // Throw errors early if parameters are not as expected
+ timeoutConf match {
+ case ProcessingTimeTimeout =>
+ require(batchTimestampMs.nonEmpty)
+ case EventTimeTimeout =>
+ require(eventTimeWatermark.nonEmpty) // watermark value has been populated
+ require(watermarkExpression.nonEmpty) // input schema has watermark attribute
+ case _ =>
+ }
+
child.execute().mapPartitionsWithStateStore[InternalRow](
getStateId.checkpointLocation,
getStateId.operatorId,
@@ -84,15 +99,23 @@ case class FlatMapGroupsWithStateExec(
groupingAttributes.toStructType,
stateAttributes.toStructType,
sqlContext.sessionState,
- Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iterator) =>
+ Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) =>
val updater = new StateStoreUpdater(store)
+ // If timeout is based on event time, then filter late data based on watermark
+ val filteredIter = watermarkPredicateForData match {
+ case Some(predicate) if timeoutConf == EventTimeTimeout =>
+ iter.filter(row => !predicate.eval(row))
+ case None =>
+ iter
+ }
+
// Generate a iterator that returns the rows grouped by the grouping function
// Note that this code ensures that the filtering for timeout occurs only after
// all the data has been processed. This is to ensure that the timeout information of all
// the keys with data is updated before they are processed for timeouts.
val outputIterator =
- updater.updateStateForKeysWithData(iterator) ++ updater.updateStateForTimedOutKeys()
+ updater.updateStateForKeysWithData(filteredIter) ++ updater.updateStateForTimedOutKeys()
// Return an iterator of all the rows generated by all the keys, such that when fully
// consumed, all the state updates will be committed by the state store
@@ -124,7 +147,7 @@ case class FlatMapGroupsWithStateExec(
private val stateSerializer = {
val encoderSerializer = stateEncoder.namedExpressions
if (isTimeoutEnabled) {
- encoderSerializer :+ Literal(KeyedStateImpl.TIMEOUT_TIMESTAMP_NOT_SET)
+ encoderSerializer :+ Literal(KeyedStateImpl.NO_TIMESTAMP)
} else {
encoderSerializer
}
@@ -157,16 +180,19 @@ case class FlatMapGroupsWithStateExec(
/** Find the groups that have timeout set and are timing out right now, and call the function */
def updateStateForTimedOutKeys(): Iterator[InternalRow] = {
if (isTimeoutEnabled) {
+ val timeoutThreshold = timeoutConf match {
+ case ProcessingTimeTimeout => batchTimestampMs.get
+ case EventTimeTimeout => eventTimeWatermark.get
+ case _ =>
+ throw new IllegalStateException(
+ s"Cannot filter timed out keys for $timeoutConf")
+ }
val timingOutKeys = store.filter { case (_, stateRow) =>
val timeoutTimestamp = getTimeoutTimestamp(stateRow)
- timeoutTimestamp != TIMEOUT_TIMESTAMP_NOT_SET && timeoutTimestamp < batchTimestampMs
+ timeoutTimestamp != NO_TIMESTAMP && timeoutTimestamp < timeoutThreshold
}
timingOutKeys.flatMap { case (keyRow, stateRow) =>
- callFunctionAndUpdateState(
- keyRow,
- Iterator.empty,
- Some(stateRow),
- hasTimedOut = true)
+ callFunctionAndUpdateState(keyRow, Iterator.empty, Some(stateRow), hasTimedOut = true)
}
} else Iterator.empty
}
@@ -186,7 +212,11 @@ case class FlatMapGroupsWithStateExec(
val valueObjIter = valueRowIter.map(getValueObj.apply) // convert value rows to objects
val stateObjOption = getStateObj(prevStateRowOption)
val keyedState = new KeyedStateImpl(
- stateObjOption, batchTimestampMs, isTimeoutEnabled, hasTimedOut)
+ stateObjOption,
+ batchTimestampMs.getOrElse(NO_TIMESTAMP),
+ eventTimeWatermark.getOrElse(NO_TIMESTAMP),
+ timeoutConf,
+ hasTimedOut)
// Call function, get the returned objects and convert them to rows
val mappedIterator = func(keyObj, valueObjIter, keyedState).map { obj =>
@@ -196,8 +226,6 @@ case class FlatMapGroupsWithStateExec(
// When the iterator is consumed, then write changes to state
def onIteratorCompletion: Unit = {
- // Has the timeout information changed
-
if (keyedState.hasRemoved) {
store.remove(keyRow)
numUpdatedStateRows += 1
@@ -205,26 +233,25 @@ case class FlatMapGroupsWithStateExec(
} else {
val previousTimeoutTimestamp = prevStateRowOption match {
case Some(row) => getTimeoutTimestamp(row)
- case None => TIMEOUT_TIMESTAMP_NOT_SET
+ case None => NO_TIMESTAMP
}
-
+ val currentTimeoutTimestamp = keyedState.getTimeoutTimestamp
val stateRowToWrite = if (keyedState.hasUpdated) {
getStateRow(keyedState.get)
} else {
prevStateRowOption.orNull
}
- val hasTimeoutChanged = keyedState.getTimeoutTimestamp != previousTimeoutTimestamp
+ val hasTimeoutChanged = currentTimeoutTimestamp != previousTimeoutTimestamp
val shouldWriteState = keyedState.hasUpdated || hasTimeoutChanged
if (shouldWriteState) {
if (stateRowToWrite == null) {
// This should never happen because checks in KeyedStateImpl should avoid cases
// where empty state would need to be written
- throw new IllegalStateException(
- "Attempting to write empty state")
+ throw new IllegalStateException("Attempting to write empty state")
}
- setTimeoutTimestamp(stateRowToWrite, keyedState.getTimeoutTimestamp)
+ setTimeoutTimestamp(stateRowToWrite, currentTimeoutTimestamp)
store.put(keyRow.copy(), stateRowToWrite.copy())
numUpdatedStateRows += 1
}
@@ -247,7 +274,7 @@ case class FlatMapGroupsWithStateExec(
/** Returns the timeout timestamp of a state row is set */
def getTimeoutTimestamp(stateRow: UnsafeRow): Long = {
- if (isTimeoutEnabled) stateRow.getLong(timeoutTimestampIndex) else TIMEOUT_TIMESTAMP_NOT_SET
+ if (isTimeoutEnabled) stateRow.getLong(timeoutTimestampIndex) else NO_TIMESTAMP
}
/** Set the timestamp in a state row */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index a934c75a02..0f0e4a91f8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -108,7 +108,10 @@ class IncrementalExecution(
case m: FlatMapGroupsWithStateExec =>
val stateId =
OperatorStateId(checkpointLocation, operatorId.getAndIncrement(), currentBatchId)
- m.copy(stateId = Some(stateId), batchTimestampMs = offsetSeqMetadata.batchTimestampMs)
+ m.copy(
+ stateId = Some(stateId),
+ batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs),
+ eventTimeWatermark = Some(offsetSeqMetadata.batchWatermarkMs))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/KeyedStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/KeyedStateImpl.scala
index ac421d395b..edfd35bd5d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/KeyedStateImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/KeyedStateImpl.scala
@@ -17,37 +17,45 @@
package org.apache.spark.sql.execution.streaming
+import java.sql.Date
+
import org.apache.commons.lang3.StringUtils
-import org.apache.spark.sql.streaming.KeyedState
+import org.apache.spark.sql.catalyst.plans.logical.{EventTimeTimeout, ProcessingTimeTimeout}
+import org.apache.spark.sql.execution.streaming.KeyedStateImpl._
+import org.apache.spark.sql.streaming.{KeyedState, KeyedStateTimeout}
import org.apache.spark.unsafe.types.CalendarInterval
+
/**
* Internal implementation of the [[KeyedState]] interface. Methods are not thread-safe.
* @param optionalValue Optional value of the state
* @param batchProcessingTimeMs Processing time of current batch, used to calculate timestamp
* for processing time timeouts
- * @param isTimeoutEnabled Whether timeout is enabled. This will be used to check whether the user
- * is allowed to configure timeouts.
+ * @param timeoutConf Type of timeout configured. Based on this, different operations will
+ * be supported.
* @param hasTimedOut Whether the key for which this state wrapped is being created is
* getting timed out or not.
*/
private[sql] class KeyedStateImpl[S](
optionalValue: Option[S],
batchProcessingTimeMs: Long,
- isTimeoutEnabled: Boolean,
+ eventTimeWatermarkMs: Long,
+ timeoutConf: KeyedStateTimeout,
override val hasTimedOut: Boolean) extends KeyedState[S] {
- import KeyedStateImpl._
-
// Constructor to create dummy state when using mapGroupsWithState in a batch query
def this(optionalValue: Option[S]) = this(
- optionalValue, -1, isTimeoutEnabled = false, hasTimedOut = false)
+ optionalValue,
+ batchProcessingTimeMs = NO_TIMESTAMP,
+ eventTimeWatermarkMs = NO_TIMESTAMP,
+ timeoutConf = KeyedStateTimeout.NoTimeout,
+ hasTimedOut = false)
private var value: S = optionalValue.getOrElse(null.asInstanceOf[S])
private var defined: Boolean = optionalValue.isDefined
private var updated: Boolean = false // whether value has been updated (but not removed)
private var removed: Boolean = false // whether value has been removed
- private var timeoutTimestamp: Long = TIMEOUT_TIMESTAMP_NOT_SET
+ private var timeoutTimestamp: Long = NO_TIMESTAMP
// ========= Public API =========
override def exists: Boolean = defined
@@ -82,13 +90,14 @@ private[sql] class KeyedStateImpl[S](
defined = false
updated = false
removed = true
- timeoutTimestamp = TIMEOUT_TIMESTAMP_NOT_SET
+ timeoutTimestamp = NO_TIMESTAMP
}
override def setTimeoutDuration(durationMs: Long): Unit = {
- if (!isTimeoutEnabled) {
+ if (timeoutConf != ProcessingTimeTimeout) {
throw new UnsupportedOperationException(
- "Cannot set timeout information without enabling timeout in map/flatMapGroupsWithState")
+ "Cannot set timeout duration without enabling processing time timeout in " +
+ "map/flatMapGroupsWithState")
}
if (!defined) {
throw new IllegalStateException(
@@ -99,7 +108,7 @@ private[sql] class KeyedStateImpl[S](
if (durationMs <= 0) {
throw new IllegalArgumentException("Timeout duration must be positive")
}
- if (!removed && batchProcessingTimeMs != NO_BATCH_PROCESSING_TIMESTAMP) {
+ if (!removed && batchProcessingTimeMs != NO_TIMESTAMP) {
timeoutTimestamp = durationMs + batchProcessingTimeMs
} else {
// This is being called in a batch query, hence no processing timestamp.
@@ -108,29 +117,55 @@ private[sql] class KeyedStateImpl[S](
}
override def setTimeoutDuration(duration: String): Unit = {
- if (StringUtils.isBlank(duration)) {
- throw new IllegalArgumentException(
- "The window duration, slide duration and start time cannot be null or blank.")
- }
- val intervalString = if (duration.startsWith("interval")) {
- duration
- } else {
- "interval " + duration
+ setTimeoutDuration(parseDuration(duration))
+ }
+
+ @throws[IllegalArgumentException]("if 'timestampMs' is not positive")
+ @throws[IllegalStateException]("when state is either not initialized, or already removed")
+ @throws[UnsupportedOperationException](
+ "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
+ override def setTimeoutTimestamp(timestampMs: Long): Unit = {
+ checkTimeoutTimestampAllowed()
+ if (timestampMs <= 0) {
+ throw new IllegalArgumentException("Timeout timestamp must be positive")
}
- val cal = CalendarInterval.fromString(intervalString)
- if (cal == null) {
+ if (eventTimeWatermarkMs != NO_TIMESTAMP && timestampMs < eventTimeWatermarkMs) {
throw new IllegalArgumentException(
- s"The provided duration ($duration) is not valid.")
+ s"Timeout timestamp ($timestampMs) cannot be earlier than the " +
+ s"current watermark ($eventTimeWatermarkMs)")
}
- if (cal.milliseconds < 0 || cal.months < 0) {
- throw new IllegalArgumentException("Timeout duration must be positive")
+ if (!removed && batchProcessingTimeMs != NO_TIMESTAMP) {
+ timeoutTimestamp = timestampMs
+ } else {
+ // This is being called in a batch query, hence no processing timestamp.
+ // Just ignore any attempts to set timeout.
}
+ }
- val delayMs = {
- val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31
- cal.milliseconds + cal.months * millisPerMonth
- }
- setTimeoutDuration(delayMs)
+ @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
+ @throws[IllegalStateException]("when state is either not initialized, or already removed")
+ @throws[UnsupportedOperationException](
+ "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
+ override def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit = {
+ checkTimeoutTimestampAllowed()
+ setTimeoutTimestamp(parseDuration(additionalDuration) + timestampMs)
+ }
+
+ @throws[IllegalStateException]("when state is either not initialized, or already removed")
+ @throws[UnsupportedOperationException](
+ "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
+ override def setTimeoutTimestamp(timestamp: Date): Unit = {
+ checkTimeoutTimestampAllowed()
+ setTimeoutTimestamp(timestamp.getTime)
+ }
+
+ @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
+ @throws[IllegalStateException]("when state is either not initialized, or already removed")
+ @throws[UnsupportedOperationException](
+ "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
+ override def setTimeoutTimestamp(timestamp: Date, additionalDuration: String): Unit = {
+ checkTimeoutTimestampAllowed()
+ setTimeoutTimestamp(timestamp.getTime + parseDuration(additionalDuration))
}
override def toString: String = {
@@ -147,14 +182,46 @@ private[sql] class KeyedStateImpl[S](
/** Return timeout timestamp or `TIMEOUT_TIMESTAMP_NOT_SET` if not set */
def getTimeoutTimestamp: Long = timeoutTimestamp
+
+ private def parseDuration(duration: String): Long = {
+ if (StringUtils.isBlank(duration)) {
+ throw new IllegalArgumentException(
+ "Provided duration is null or blank.")
+ }
+ val intervalString = if (duration.startsWith("interval")) {
+ duration
+ } else {
+ "interval " + duration
+ }
+ val cal = CalendarInterval.fromString(intervalString)
+ if (cal == null) {
+ throw new IllegalArgumentException(
+ s"Provided duration ($duration) is not valid.")
+ }
+ if (cal.milliseconds < 0 || cal.months < 0) {
+ throw new IllegalArgumentException(s"Provided duration ($duration) is not positive")
+ }
+
+ val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31
+ cal.milliseconds + cal.months * millisPerMonth
+ }
+
+ private def checkTimeoutTimestampAllowed(): Unit = {
+ if (timeoutConf != EventTimeTimeout) {
+ throw new UnsupportedOperationException(
+ "Cannot set timeout timestamp without enabling event time timeout in " +
+ "map/flatMapGroupsWithState")
+ }
+ if (!defined) {
+ throw new IllegalStateException(
+ "Cannot set timeout timestamp without any state value, " +
+ "state has either not been initialized, or has already been removed")
+ }
+ }
}
private[sql] object KeyedStateImpl {
- // Value used in the state row to represent the lack of any timeout timestamp
- val TIMEOUT_TIMESTAMP_NOT_SET = -1L
-
- // Value to represent that no batch processing timestamp is passed to KeyedStateImpl. This is
- // used in batch queries where there are no streaming batches and timeouts.
- val NO_BATCH_PROCESSING_TIMESTAMP = -1L
+ // Value used represent the lack of valid timestamp as a long
+ val NO_TIMESTAMP = -1L
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index 6d2de441eb..f72144a25d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -80,7 +80,7 @@ trait WatermarkSupport extends UnaryExecNode {
/** Generate an expression that matches data older than the watermark */
lazy val watermarkExpression: Option[Expression] = {
val optionalWatermarkAttribute =
- keyExpressions.find(_.metadata.contains(EventTimeWatermark.delayKey))
+ child.output.find(_.metadata.contains(EventTimeWatermark.delayKey))
optionalWatermarkAttribute.map { watermarkAttribute =>
// If we are evicting based on a window, use the end of the window. Otherwise just
@@ -101,14 +101,12 @@ trait WatermarkSupport extends UnaryExecNode {
}
}
- /** Generate a predicate based on keys that matches data older than the watermark */
+ /** Predicate based on keys that matches data older than the watermark */
lazy val watermarkPredicateForKeys: Option[Predicate] =
watermarkExpression.map(newPredicate(_, keyExpressions))
- /**
- * Generate a predicate based on the child output that matches data older than the watermark.
- */
- lazy val watermarkPredicate: Option[Predicate] =
+ /** Predicate based on the child output that matches data older than the watermark. */
+ lazy val watermarkPredicateForData: Option[Predicate] =
watermarkExpression.map(newPredicate(_, child.output))
}
@@ -218,7 +216,7 @@ case class StateStoreSaveExec(
new Iterator[InternalRow] {
// Filter late date using watermark if specified
- private[this] val baseIterator = watermarkPredicate match {
+ private[this] val baseIterator = watermarkPredicateForData match {
case Some(predicate) => iter.filter((row: InternalRow) => !predicate.eval(row))
case None => iter
}
@@ -285,7 +283,7 @@ case class StreamingDeduplicateExec(
val numTotalStateRows = longMetric("numTotalStateRows")
val numUpdatedStateRows = longMetric("numUpdatedStateRows")
- val baseIterator = watermarkPredicate match {
+ val baseIterator = watermarkPredicateForData match {
case Some(predicate) => iter.filter(row => !predicate.eval(row))
case None => iter
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/KeyedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/KeyedState.scala
index 6b4b1ced98..461de04f6b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/KeyedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/KeyedState.scala
@@ -55,7 +55,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalKeyedState
* batch, nor with streaming Datasets.
* - All the data will be shuffled before applying the function.
* - If timeout is set, then the function will also be called with no values.
- * See more details on KeyedStateTimeout` below.
+ * See more details on `KeyedStateTimeout` below.
*
* Important points to note about using `KeyedState`.
* - The value of the state cannot be null. So updating state with null will throw
@@ -68,20 +68,38 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalKeyedState
*
* Important points to note about using `KeyedStateTimeout`.
* - The timeout type is a global param across all the keys (set as `timeout` param in
- * `[map|flatMap]GroupsWithState`, but the exact timeout duration is configurable per key
- * (by calling `setTimeout...()` in `KeyedState`).
- * - When the timeout occurs for a key, the function is called with no values, and
+ * `[map|flatMap]GroupsWithState`, but the exact timeout duration/timestamp is configurable per
+ * key by calling `setTimeout...()` in `KeyedState`.
+ * - Timeouts can be either based on processing time (i.e.
+ * [[KeyedStateTimeout.ProcessingTimeTimeout]]) or event time (i.e.
+ * [[KeyedStateTimeout.EventTimeTimeout]]).
+ * - With `ProcessingTimeTimeout`, the timeout duration can be set by calling
+ * `KeyedState.setTimeoutDuration`. The timeout will occur when the clock has advanced by the set
+ * duration. Guarantees provided by this timeout with a duration of D ms are as follows:
+ * - Timeout will never be occur before the clock time has advanced by D ms
+ * - Timeout will occur eventually when there is a trigger in the query
+ * (i.e. after D ms). So there is a no strict upper bound on when the timeout would occur.
+ * For example, the trigger interval of the query will affect when the timeout actually occurs.
+ * If there is no data in the stream (for any key) for a while, then their will not be
+ * any trigger and timeout function call will not occur until there is data.
+ * - Since the processing time timeout is based on the clock time, it is affected by the
+ * variations in the system clock (i.e. time zone changes, clock skew, etc.).
+ * - With `EventTimeTimeout`, the user also has to specify the the the event time watermark in
+ * the query using `Dataset.withWatermark()`. With this setting, data that is older than the
+ * watermark are filtered out. The timeout can be enabled for a key by setting a timestamp using
+ * `KeyedState.setTimeoutTimestamp()`, and the timeout would occur when the watermark advances
+ * beyond the set timestamp. You can control the timeout delay by two parameters - (i) watermark
+ * delay and an additional duration beyond the timestamp in the event (which is guaranteed to
+ * > watermark due to the filtering). Guarantees provided by this timeout are as follows:
+ * - Timeout will never be occur before watermark has exceeded the set timeout.
+ * - Similar to processing time timeouts, there is a no strict upper bound on the delay when
+ * the timeout actually occurs. The watermark can advance only when there is data in the
+ * stream, and the event time of the data has actually advanced.
+ * - When the timeout occurs for a key, the function is called for that key with no values, and
* `KeyedState.hasTimedOut()` set to true.
* - The timeout is reset for key every time the function is called on the key, that is,
* when the key has new data, or the key has timed out. So the user has to set the timeout
* duration every time the function is called, otherwise there will not be any timeout set.
- * - Guarantees provided on processing-time-based timeout of key, when timeout duration is D ms:
- * - Timeout will never be called before real clock time has advanced by D ms
- * - Timeout will be called eventually when there is a trigger in the query
- * (i.e. after D ms). So there is a no strict upper bound on when the timeout would occur.
- * For example, the trigger interval of the query will affect when the timeout is actually hit.
- * If there is no data in the stream (for any key) for a while, then their will not be
- * any trigger and timeout will not be hit until there is data.
*
* Scala example of using KeyedState in `mapGroupsWithState`:
* {{{
@@ -194,7 +212,8 @@ trait KeyedState[S] extends LogicalKeyedState[S] {
/**
* Set the timeout duration in ms for this key.
- * @note Timeouts must be enabled in `[map/flatmap]GroupsWithStates`.
+ *
+ * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
*/
@throws[IllegalArgumentException]("if 'durationMs' is not positive")
@throws[IllegalStateException]("when state is either not initialized, or already removed")
@@ -204,11 +223,63 @@ trait KeyedState[S] extends LogicalKeyedState[S] {
/**
* Set the timeout duration for this key as a string. For example, "1 hour", "2 days", etc.
- * @note, Timeouts must be enabled in `[map/flatmap]GroupsWithStates`.
+ *
+ * @note, ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
*/
@throws[IllegalArgumentException]("if 'duration' is not a valid duration")
@throws[IllegalStateException]("when state is either not initialized, or already removed")
@throws[UnsupportedOperationException](
"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
def setTimeoutDuration(duration: String): Unit
+
+ @throws[IllegalArgumentException]("if 'timestampMs' is not positive")
+ @throws[IllegalStateException]("when state is either not initialized, or already removed")
+ @throws[UnsupportedOperationException](
+ "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
+ /**
+ * Set the timeout timestamp for this key as milliseconds in epoch time.
+ * This timestamp cannot be older than the current watermark.
+ *
+ * @note, EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
+ */
+ def setTimeoutTimestamp(timestampMs: Long): Unit
+
+ @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
+ @throws[IllegalStateException]("when state is either not initialized, or already removed")
+ @throws[UnsupportedOperationException](
+ "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
+ /**
+ * Set the timeout timestamp for this key as milliseconds in epoch time and an additional
+ * duration as a string (e.g. "1 hour", "2 days", etc.).
+ * The final timestamp (including the additional duration) cannot be older than the
+ * current watermark.
+ *
+ * @note, EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
+ */
+ def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit
+
+ @throws[IllegalStateException]("when state is either not initialized, or already removed")
+ @throws[UnsupportedOperationException](
+ "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
+ /**
+ * Set the timeout timestamp for this key as a java.sql.Date.
+ * This timestamp cannot be older than the current watermark.
+ *
+ * @note, EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
+ */
+ def setTimeoutTimestamp(timestamp: java.sql.Date): Unit
+
+ @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
+ @throws[IllegalStateException]("when state is either not initialized, or already removed")
+ @throws[UnsupportedOperationException](
+ "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
+ /**
+ * Set the timeout timestamp for this key as a java.sql.Date and an additional
+ * duration as a string (e.g. "1 hour", "2 days", etc.).
+ * The final timestamp (including the additional duration) cannot be older than the
+ * current watermark.
+ *
+ * @note, EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
+ */
+ def setTimeoutTimestamp(timestamp: java.sql.Date, additionalDuration: String): Unit
}