aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2017-02-23 11:25:39 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2017-02-23 11:25:39 -0800
commit9bf4e2baad0e2851da554d85223ffaa029cfa490 (patch)
treea08773e6a82e7d5fa78ca2f71d707e74be36a9cc /sql/core/src/main/scala/org
parent7bf09433f5c5e08154ba106be21fe24f17cd282b (diff)
downloadspark-9bf4e2baad0e2851da554d85223ffaa029cfa490.tar.gz
spark-9bf4e2baad0e2851da554d85223ffaa029cfa490.tar.bz2
spark-9bf4e2baad0e2851da554d85223ffaa029cfa490.zip
[SPARK-19497][SS] Implement streaming deduplication
## What changes were proposed in this pull request? This PR adds a special streaming deduplication operator to support `dropDuplicates` with `aggregation` and watermark. It reuses the `dropDuplicates` API but creates new logical plan `Deduplication` and new physical plan `DeduplicationExec`. The following cases are supported: - one or multiple `dropDuplicates()` without aggregation (with or without watermark) - `dropDuplicates` before aggregation Not supported cases: - `dropDuplicates` after aggregation Breaking changes: - `dropDuplicates` without aggregation doesn't work with `complete` or `update` mode. ## How was this patch tested? The new unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #16970 from zsxwing/dedup.
Diffstat (limited to 'sql/core/src/main/scala/org')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala39
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala15
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala140
4 files changed, 160 insertions, 44 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 1ebc53d2bb..3c212d656e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -557,7 +557,8 @@ class Dataset[T] private[sql](
* Spark will use this watermark for several purposes:
* - To know when a given time window aggregation can be finalized and thus can be emitted when
* using output modes that do not allow updates.
- * - To minimize the amount of state that we need to keep for on-going aggregations.
+ * - To minimize the amount of state that we need to keep for on-going aggregations,
+ * `mapGroupsWithState` and `dropDuplicates` operators.
*
* The current watermark is computed by looking at the `MAX(eventTime)` seen across
* all of the partitions in the query minus a user specified `delayThreshold`. Due to the cost
@@ -1981,6 +1982,12 @@ class Dataset[T] private[sql](
* Returns a new Dataset that contains only the unique rows from this Dataset.
* This is an alias for `distinct`.
*
+ * For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], it
+ * will keep all data across triggers as intermediate state to drop duplicates rows. You can use
+ * [[withWatermark]] to limit how late the duplicate data can be and system will accordingly limit
+ * the state. In addition, too late data older than watermark will be dropped to avoid any
+ * possibility of duplicates.
+ *
* @group typedrel
* @since 2.0.0
*/
@@ -1990,13 +1997,19 @@ class Dataset[T] private[sql](
* (Scala-specific) Returns a new Dataset with duplicate rows removed, considering only
* the subset of columns.
*
+ * For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], it
+ * will keep all data across triggers as intermediate state to drop duplicates rows. You can use
+ * [[withWatermark]] to limit how late the duplicate data can be and system will accordingly limit
+ * the state. In addition, too late data older than watermark will be dropped to avoid any
+ * possibility of duplicates.
+ *
* @group typedrel
* @since 2.0.0
*/
def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan {
val resolver = sparkSession.sessionState.analyzer.resolver
val allColumns = queryExecution.analyzed.output
- val groupCols = colNames.flatMap { colName =>
+ val groupCols = colNames.toSet.toSeq.flatMap { (colName: String) =>
// It is possibly there are more than one columns with the same name,
// so we call filter instead of find.
val cols = allColumns.filter(col => resolver(col.name, colName))
@@ -2006,21 +2019,19 @@ class Dataset[T] private[sql](
}
cols
}
- val groupColExprIds = groupCols.map(_.exprId)
- val aggCols = logicalPlan.output.map { attr =>
- if (groupColExprIds.contains(attr.exprId)) {
- attr
- } else {
- Alias(new First(attr).toAggregateExpression(), attr.name)()
- }
- }
- Aggregate(groupCols, aggCols, logicalPlan)
+ Deduplicate(groupCols, logicalPlan, isStreaming)
}
/**
* Returns a new Dataset with duplicate rows removed, considering only
* the subset of columns.
*
+ * For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], it
+ * will keep all data across triggers as intermediate state to drop duplicates rows. You can use
+ * [[withWatermark]] to limit how late the duplicate data can be and system will accordingly limit
+ * the state. In addition, too late data older than watermark will be dropped to avoid any
+ * possibility of duplicates.
+ *
* @group typedrel
* @since 2.0.0
*/
@@ -2030,6 +2041,12 @@ class Dataset[T] private[sql](
* Returns a new [[Dataset]] with duplicate rows removed, considering only
* the subset of columns.
*
+ * For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], it
+ * will keep all data across triggers as intermediate state to drop duplicates rows. You can use
+ * [[withWatermark]] to limit how late the duplicate data can be and system will accordingly limit
+ * the state. In addition, too late data older than watermark will be dropped to avoid any
+ * possibility of duplicates.
+ *
* @group typedrel
* @since 2.0.0
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 0e3d5595df..027b1481af 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -22,9 +22,10 @@ import org.apache.spark.sql.{SaveMode, Strategy}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.First
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, EventTimeWatermark, LogicalPlan, MapGroupsWithState}
+import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution
import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
@@ -245,6 +246,18 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
/**
+ * Used to plan the streaming deduplicate operator.
+ */
+ object StreamingDeduplicationStrategy extends Strategy {
+ override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case Deduplicate(keys, child, true) =>
+ StreamingDeduplicateExec(keys, planLater(child)) :: Nil
+
+ case _ => Nil
+ }
+ }
+
+ /**
* Used to plan the aggregate operator for expressions based on the AggregateFunction2 interface.
*/
object Aggregation extends Strategy {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index a3e108b29e..ffdcd9b19d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -45,6 +45,7 @@ class IncrementalExecution(
sparkSession.sessionState.planner.StatefulAggregationStrategy +:
sparkSession.sessionState.planner.MapGroupsWithStateStrategy +:
sparkSession.sessionState.planner.StreamingRelationStrategy +:
+ sparkSession.sessionState.planner.StreamingDeduplicationStrategy +:
sparkSession.sessionState.experimentalMethods.extraStrategies
// Modified planner with stateful operations.
@@ -93,6 +94,15 @@ class IncrementalExecution(
keys,
Some(stateId),
child) :: Nil))
+ case StreamingDeduplicateExec(keys, child, None, None) =>
+ val stateId =
+ OperatorStateId(checkpointLocation, operatorId.getAndIncrement(), currentBatchId)
+
+ StreamingDeduplicateExec(
+ keys,
+ child,
+ Some(stateId),
+ Some(currentEventTimeWatermark))
case MapGroupsWithStateExec(
f, kDeser, vDeser, group, data, output, None, stateDeser, stateSer, child) =>
val stateId =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index 1292452574..d92529748b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -25,12 +25,11 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjecti
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalKeyedState}
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, Partitioning}
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
-import org.apache.spark.sql.execution
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.execution.streaming.state._
import org.apache.spark.sql.streaming.OutputMode
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{DataType, NullType, StructType}
import org.apache.spark.util.CompletionIterator
@@ -68,6 +67,40 @@ trait StateStoreWriter extends StatefulOperator {
"numUpdatedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of updated state rows"))
}
+/** An operator that supports watermark. */
+trait WatermarkSupport extends SparkPlan {
+
+ /** The keys that may have a watermark attribute. */
+ def keyExpressions: Seq[Attribute]
+
+ /** The watermark value. */
+ def eventTimeWatermark: Option[Long]
+
+ /** Generate a predicate that matches data older than the watermark */
+ lazy val watermarkPredicate: Option[Predicate] = {
+ val optionalWatermarkAttribute =
+ keyExpressions.find(_.metadata.contains(EventTimeWatermark.delayKey))
+
+ optionalWatermarkAttribute.map { watermarkAttribute =>
+ // If we are evicting based on a window, use the end of the window. Otherwise just
+ // use the attribute itself.
+ val evictionExpression =
+ if (watermarkAttribute.dataType.isInstanceOf[StructType]) {
+ LessThanOrEqual(
+ GetStructField(watermarkAttribute, 1),
+ Literal(eventTimeWatermark.get * 1000))
+ } else {
+ LessThanOrEqual(
+ watermarkAttribute,
+ Literal(eventTimeWatermark.get * 1000))
+ }
+
+ logInfo(s"Filtering state store on: $evictionExpression")
+ newPredicate(evictionExpression, keyExpressions)
+ }
+ }
+}
+
/**
* For each input tuple, the key is calculated and the value from the [[StateStore]] is added
* to the stream (in addition to the input tuple) if present.
@@ -76,7 +109,7 @@ case class StateStoreRestoreExec(
keyExpressions: Seq[Attribute],
stateId: Option[OperatorStateId],
child: SparkPlan)
- extends execution.UnaryExecNode with StateStoreReader {
+ extends UnaryExecNode with StateStoreReader {
override protected def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
@@ -113,31 +146,7 @@ case class StateStoreSaveExec(
outputMode: Option[OutputMode] = None,
eventTimeWatermark: Option[Long] = None,
child: SparkPlan)
- extends execution.UnaryExecNode with StateStoreWriter {
-
- /** Generate a predicate that matches data older than the watermark */
- private lazy val watermarkPredicate: Option[Predicate] = {
- val optionalWatermarkAttribute =
- keyExpressions.find(_.metadata.contains(EventTimeWatermark.delayKey))
-
- optionalWatermarkAttribute.map { watermarkAttribute =>
- // If we are evicting based on a window, use the end of the window. Otherwise just
- // use the attribute itself.
- val evictionExpression =
- if (watermarkAttribute.dataType.isInstanceOf[StructType]) {
- LessThanOrEqual(
- GetStructField(watermarkAttribute, 1),
- Literal(eventTimeWatermark.get * 1000))
- } else {
- LessThanOrEqual(
- watermarkAttribute,
- Literal(eventTimeWatermark.get * 1000))
- }
-
- logInfo(s"Filtering state store on: $evictionExpression")
- newPredicate(evictionExpression, keyExpressions)
- }
- }
+ extends UnaryExecNode with StateStoreWriter with WatermarkSupport {
override protected def doExecute(): RDD[InternalRow] = {
metrics // force lazy init at driver
@@ -146,8 +155,8 @@ case class StateStoreSaveExec(
child.execute().mapPartitionsWithStateStore(
getStateId.checkpointLocation,
- operatorId = getStateId.operatorId,
- storeVersion = getStateId.batchId,
+ getStateId.operatorId,
+ getStateId.batchId,
keyExpressions.toStructType,
child.output.toStructType,
sqlContext.sessionState,
@@ -262,8 +271,8 @@ case class MapGroupsWithStateExec(
override protected def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitionsWithStateStore[InternalRow](
getStateId.checkpointLocation,
- operatorId = getStateId.operatorId,
- storeVersion = getStateId.batchId,
+ getStateId.operatorId,
+ getStateId.batchId,
groupingAttributes.toStructType,
child.output.toStructType,
sqlContext.sessionState,
@@ -321,3 +330,70 @@ case class MapGroupsWithStateExec(
}
}
}
+
+
+/** Physical operator for executing streaming Deduplicate. */
+case class StreamingDeduplicateExec(
+ keyExpressions: Seq[Attribute],
+ child: SparkPlan,
+ stateId: Option[OperatorStateId] = None,
+ eventTimeWatermark: Option[Long] = None)
+ extends UnaryExecNode with StateStoreWriter with WatermarkSupport {
+
+ /** Distribute by grouping attributes */
+ override def requiredChildDistribution: Seq[Distribution] =
+ ClusteredDistribution(keyExpressions) :: Nil
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ metrics // force lazy init at driver
+
+ child.execute().mapPartitionsWithStateStore(
+ getStateId.checkpointLocation,
+ getStateId.operatorId,
+ getStateId.batchId,
+ keyExpressions.toStructType,
+ child.output.toStructType,
+ sqlContext.sessionState,
+ Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
+ val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
+ val numOutputRows = longMetric("numOutputRows")
+ val numTotalStateRows = longMetric("numTotalStateRows")
+ val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+
+ val baseIterator = watermarkPredicate match {
+ case Some(predicate) => iter.filter((row: InternalRow) => !predicate.eval(row))
+ case None => iter
+ }
+
+ val result = baseIterator.filter { r =>
+ val row = r.asInstanceOf[UnsafeRow]
+ val key = getKey(row)
+ val value = store.get(key)
+ if (value.isEmpty) {
+ store.put(key.copy(), StreamingDeduplicateExec.EMPTY_ROW)
+ numUpdatedStateRows += 1
+ numOutputRows += 1
+ true
+ } else {
+ // Drop duplicated rows
+ false
+ }
+ }
+
+ CompletionIterator[InternalRow, Iterator[InternalRow]](result, {
+ watermarkPredicate.foreach(f => store.remove(f.eval _))
+ store.commit()
+ numTotalStateRows += store.numKeys()
+ })
+ }
+ }
+
+ override def output: Seq[Attribute] = child.output
+
+ override def outputPartitioning: Partitioning = child.outputPartitioning
+}
+
+object StreamingDeduplicateExec {
+ private val EMPTY_ROW =
+ UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
+}