aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2017-02-23 11:25:39 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2017-02-23 11:25:39 -0800
commit9bf4e2baad0e2851da554d85223ffaa029cfa490 (patch)
treea08773e6a82e7d5fa78ca2f71d707e74be36a9cc /sql/catalyst
parent7bf09433f5c5e08154ba106be21fe24f17cd282b (diff)
downloadspark-9bf4e2baad0e2851da554d85223ffaa029cfa490.tar.gz
spark-9bf4e2baad0e2851da554d85223ffaa029cfa490.tar.bz2
spark-9bf4e2baad0e2851da554d85223ffaa029cfa490.zip
[SPARK-19497][SS] Implement streaming deduplication
## What changes were proposed in this pull request? This PR adds a special streaming deduplication operator to support `dropDuplicates` with `aggregation` and watermark. It reuses the `dropDuplicates` API but creates new logical plan `Deduplication` and new physical plan `DeduplicationExec`. The following cases are supported: - one or multiple `dropDuplicates()` without aggregation (with or without watermark) - `dropDuplicates` before aggregation Not supported cases: - `dropDuplicates` after aggregation Breaking changes: - `dropDuplicates` without aggregation doesn't work with `complete` or `update` mode. ## How was this patch tested? The new unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #16970 from zsxwing/dedup.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala21
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala9
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala56
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala33
5 files changed, 121 insertions, 4 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index 07b3558ee2..397f5cfe2a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -75,7 +75,7 @@ object UnsupportedOperationChecker {
if (watermarkAttributes.isEmpty) {
throwError(
s"$outputMode output mode not supported when there are streaming aggregations on " +
- s"streaming DataFrames/DataSets")(plan)
+ s"streaming DataFrames/DataSets without watermark")(plan)
}
case InternalOutputModes.Complete if aggregates.isEmpty =>
@@ -120,6 +120,10 @@ object UnsupportedOperationChecker {
throwError("(map/flatMap)GroupsWithState is not supported after aggregation on a " +
"streaming DataFrame/Dataset")
+ case d: Deduplicate if collectStreamingAggregates(d).nonEmpty =>
+ throwError("dropDuplicates is not supported after aggregation on a " +
+ "streaming DataFrame/Dataset")
+
case Join(left, right, joinType, _) =>
joinType match {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index af846a09a8..036da3ad20 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -56,7 +56,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
ReplaceExpressions,
ComputeCurrentTime,
GetCurrentDatabase(sessionCatalog),
- RewriteDistinctAggregates) ::
+ RewriteDistinctAggregates,
+ ReplaceDeduplicateWithAggregate) ::
//////////////////////////////////////////////////////////////////////////////////////////
// Optimizer rules start here
//////////////////////////////////////////////////////////////////////////////////////////
@@ -1143,6 +1144,24 @@ object ReplaceDistinctWithAggregate extends Rule[LogicalPlan] {
}
/**
+ * Replaces logical [[Deduplicate]] operator with an [[Aggregate]] operator.
+ */
+object ReplaceDeduplicateWithAggregate extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case Deduplicate(keys, child, streaming) if !streaming =>
+ val keyExprIds = keys.map(_.exprId)
+ val aggCols = child.output.map { attr =>
+ if (keyExprIds.contains(attr.exprId)) {
+ attr
+ } else {
+ Alias(new First(attr).toAggregateExpression(), attr.name)(attr.exprId)
+ }
+ }
+ Aggregate(keys, aggCols, child)
+ }
+}
+
+/**
* Replaces logical [[Intersect]] operator with a left-semi [[Join]] operator.
* {{{
* SELECT a1, a2 FROM Tab1 INTERSECT SELECT b1, b2 FROM Tab2
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index d17d12cd83..ce1c55dc08 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -864,3 +864,12 @@ case object OneRowRelation extends LeafNode {
override def output: Seq[Attribute] = Nil
override def computeStats(conf: CatalystConf): Statistics = Statistics(sizeInBytes = 1)
}
+
+/** A logical plan for `dropDuplicates`. */
+case class Deduplicate(
+ keys: Seq[Attribute],
+ child: LogicalPlan,
+ streaming: Boolean) extends UnaryNode {
+
+ override def output: Seq[Attribute] = child.output
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index 3b756e89d9..82be69a0f7 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -28,7 +28,8 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{MapGroupsWithState, _}
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
import org.apache.spark.sql.streaming.OutputMode
-import org.apache.spark.sql.types.{IntegerType, LongType}
+import org.apache.spark.sql.types.{IntegerType, LongType, MetadataBuilder}
+import org.apache.spark.unsafe.types.CalendarInterval
/** A dummy command for testing unsupported operations. */
case class DummyCommand() extends Command
@@ -36,6 +37,11 @@ case class DummyCommand() extends Command
class UnsupportedOperationsSuite extends SparkFunSuite {
val attribute = AttributeReference("a", IntegerType, nullable = true)()
+ val watermarkMetadata = new MetadataBuilder()
+ .withMetadata(attribute.metadata)
+ .putLong(EventTimeWatermark.delayKey, 1000L)
+ .build()
+ val attributeWithWatermark = attribute.withMetadata(watermarkMetadata)
val batchRelation = LocalRelation(attribute)
val streamRelation = new TestStreamingRelation(attribute)
@@ -98,6 +104,27 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
outputMode = Update,
expectedMsgs = Seq("multiple streaming aggregations"))
+ assertSupportedInStreamingPlan(
+ "aggregate - streaming aggregations in update mode",
+ Aggregate(Nil, aggExprs("d"), streamRelation),
+ outputMode = Update)
+
+ assertSupportedInStreamingPlan(
+ "aggregate - streaming aggregations in complete mode",
+ Aggregate(Nil, aggExprs("d"), streamRelation),
+ outputMode = Complete)
+
+ assertSupportedInStreamingPlan(
+ "aggregate - streaming aggregations with watermark in append mode",
+ Aggregate(Seq(attributeWithWatermark), aggExprs("d"), streamRelation),
+ outputMode = Append)
+
+ assertNotSupportedInStreamingPlan(
+ "aggregate - streaming aggregations without watermark in append mode",
+ Aggregate(Nil, aggExprs("d"), streamRelation),
+ outputMode = Append,
+ expectedMsgs = Seq("streaming aggregations", "without watermark"))
+
// Aggregation: Distinct aggregates not supported on streaming relation
val distinctAggExprs = Seq(Count("*").toAggregateExpression(isDistinct = true).as("c"))
assertSupportedInStreamingPlan(
@@ -129,6 +156,33 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
outputMode = Complete,
expectedMsgs = Seq("(map/flatMap)GroupsWithState"))
+ assertSupportedInStreamingPlan(
+ "mapGroupsWithState - mapGroupsWithState on batch relation inside streaming relation",
+ MapGroupsWithState(null, att, att, Seq(att), Seq(att), att, att, Seq(att), batchRelation),
+ outputMode = Append
+ )
+
+ // Deduplicate
+ assertSupportedInStreamingPlan(
+ "Deduplicate - Deduplicate on streaming relation before aggregation",
+ Aggregate(
+ Seq(attributeWithWatermark),
+ aggExprs("c"),
+ Deduplicate(Seq(att), streamRelation, streaming = true)),
+ outputMode = Append)
+
+ assertNotSupportedInStreamingPlan(
+ "Deduplicate - Deduplicate on streaming relation after aggregation",
+ Deduplicate(Seq(att), Aggregate(Nil, aggExprs("c"), streamRelation), streaming = true),
+ outputMode = Complete,
+ expectedMsgs = Seq("dropDuplicates"))
+
+ assertSupportedInStreamingPlan(
+ "Deduplicate - Deduplicate on batch relation inside a streaming query",
+ Deduplicate(Seq(att), batchRelation, streaming = false),
+ outputMode = Append
+ )
+
// Inner joins: Stream-stream not supported
testBinaryOperationInStreamingPlan(
"inner join",
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
index f23e262f28..e68423f85c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.Alias
+import org.apache.spark.sql.catalyst.expressions.aggregate.First
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi, PlanTest}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.RuleExecutor
@@ -30,7 +32,8 @@ class ReplaceOperatorSuite extends PlanTest {
Batch("Replace Operators", FixedPoint(100),
ReplaceDistinctWithAggregate,
ReplaceExceptWithAntiJoin,
- ReplaceIntersectWithSemiJoin) :: Nil
+ ReplaceIntersectWithSemiJoin,
+ ReplaceDeduplicateWithAggregate) :: Nil
}
test("replace Intersect with Left-semi Join") {
@@ -71,4 +74,32 @@ class ReplaceOperatorSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}
+
+ test("replace batch Deduplicate with Aggregate") {
+ val input = LocalRelation('a.int, 'b.int)
+ val attrA = input.output(0)
+ val attrB = input.output(1)
+ val query = Deduplicate(Seq(attrA), input, streaming = false) // dropDuplicates("a")
+ val optimized = Optimize.execute(query.analyze)
+
+ val correctAnswer =
+ Aggregate(
+ Seq(attrA),
+ Seq(
+ attrA,
+ Alias(new First(attrB).toAggregateExpression(), attrB.name)(attrB.exprId)
+ ),
+ input)
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("don't replace streaming Deduplicate") {
+ val input = LocalRelation('a.int, 'b.int)
+ val attrA = input.output(0)
+ val query = Deduplicate(Seq(attrA), input, streaming = true) // dropDuplicates("a")
+ val optimized = Optimize.execute(query.analyze)
+
+ comparePlans(optimized, query)
+ }
}