aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-12-15 11:54:35 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2016-12-15 11:54:35 -0800
commit4f7292c87512a7da3542998d0e5aa21c27a511e9 (patch)
treee1c5184f76024c3c106fae7c85c484022463e1d7 /sql
parent01e14bf303e61a5726f3b1418357a50c1bf8b16f (diff)
downloadspark-4f7292c87512a7da3542998d0e5aa21c27a511e9.tar.gz
spark-4f7292c87512a7da3542998d0e5aa21c27a511e9.tar.bz2
spark-4f7292c87512a7da3542998d0e5aa21c27a511e9.zip
[SPARK-18870] Disallowed Distinct Aggregations on Streaming Datasets
## What changes were proposed in this pull request? Check whether Aggregation operators on a streaming subplan have aggregate expressions with isDistinct = true. ## How was this patch tested? Added unit test Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #16289 from tdas/SPARK-18870.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala15
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala13
2 files changed, 26 insertions, 2 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index c054fcbef3..c4a78f9d21 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.{AnalysisException, InternalOutputModes}
import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.streaming.OutputMode
@@ -95,6 +96,16 @@ object UnsupportedOperationChecker {
// Operations that cannot exists anywhere in a streaming plan
subPlan match {
+ case Aggregate(_, aggregateExpressions, child) =>
+ val distinctAggExprs = aggregateExpressions.flatMap { expr =>
+ expr.collect { case ae: AggregateExpression if ae.isDistinct => ae }
+ }
+ throwErrorIf(
+ child.isStreaming && distinctAggExprs.nonEmpty,
+ "Distinct aggregations are not supported on streaming DataFrames/Datasets, unless " +
+ "it is on aggregated DataFrame/Dataset in Complete output mode. Consider using " +
+ "approximate distinct aggregation (e.g. approx_count_distinct() instead of count()).")
+
case _: Command =>
throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " +
"streaming DataFrames/Datasets")
@@ -143,7 +154,7 @@ object UnsupportedOperationChecker {
throwError("Union between streaming and batch DataFrames/Datasets is not supported")
case Except(left, right) if right.isStreaming =>
- throwError("Except with a streaming DataFrame/Dataset on the right is not supported")
+ throwError("Except on a streaming DataFrame/Dataset on the right is not supported")
case Intersect(left, right) if left.isStreaming && right.isStreaming =>
throwError("Intersect between two streaming DataFrames/Datasets is not supported")
@@ -156,7 +167,7 @@ object UnsupportedOperationChecker {
case Sort(_, _, _) | SortPartitions(_, _) if !containsCompleteData(subPlan) =>
throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it is on" +
- "aggregated DataFrame/Dataset in Complete mode")
+ "aggregated DataFrame/Dataset in Complete output mode")
case Sample(_, _, _, _, child) if child.isStreaming =>
throwError("Sampling is not supported on streaming DataFrames/Datasets")
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index ff1bb126f4..34e94c7142 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -98,6 +98,19 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
outputMode = Update,
expectedMsgs = Seq("multiple streaming aggregations"))
+ // Aggregation: Distinct aggregates not supported on streaming relation
+ val distinctAggExprs = Seq(Count("*").toAggregateExpression(isDistinct = true).as("c"))
+ assertSupportedInStreamingPlan(
+ "distinct aggregate - aggregate on batch relation",
+ Aggregate(Nil, distinctAggExprs, batchRelation),
+ outputMode = Append)
+
+ assertNotSupportedInStreamingPlan(
+ "distinct aggregate - aggregate on streaming relation",
+ Aggregate(Nil, distinctAggExprs, streamRelation),
+ outputMode = Complete,
+ expectedMsgs = Seq("distinct aggregation"))
+
// Inner joins: Stream-stream not supported
testBinaryOperationInStreamingPlan(
"inner join",