aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src/test
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2017-01-10 17:58:11 -0800
committerShixiong Zhu <shixiong@databricks.com>2017-01-10 17:58:11 -0800
commitbc6c56e940fe93591a1e5ba45751f1b243b57e28 (patch)
treea1cdea8a10912863fbdd6c8b7fe50951e57bf78d /sql/catalyst/src/test
parent856bae6af64982ae0221948c58ff564887e54a70 (diff)
downloadspark-bc6c56e940fe93591a1e5ba45751f1b243b57e28.tar.gz
spark-bc6c56e940fe93591a1e5ba45751f1b243b57e28.tar.bz2
spark-bc6c56e940fe93591a1e5ba45751f1b243b57e28.zip
[SPARK-19140][SS] Allow update mode for non-aggregation streaming queries
## What changes were proposed in this pull request? This PR allow update mode for non-aggregation streaming queries. It will be same as the append mode if a query has no aggregations. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #16520 from zsxwing/update-without-agg.
Diffstat (limited to 'sql/catalyst/src/test')
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala31
1 files changed, 17 insertions, 14 deletions
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index d2c0f8cc9f..58e69f9ebe 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -219,9 +219,9 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
"window", Window(Nil, Nil, Nil, _), expectedMsg = "non-time-based windows")
// Output modes with aggregation and non-aggregation plans
- testOutputMode(Append, shouldSupportAggregation = false)
- testOutputMode(Update, shouldSupportAggregation = true)
- testOutputMode(Complete, shouldSupportAggregation = true)
+ testOutputMode(Append, shouldSupportAggregation = false, shouldSupportNonAggregation = true)
+ testOutputMode(Update, shouldSupportAggregation = true, shouldSupportNonAggregation = true)
+ testOutputMode(Complete, shouldSupportAggregation = true, shouldSupportNonAggregation = false)
/*
=======================================================================================
@@ -323,30 +323,33 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
/** Test output mode with and without aggregation in the streaming plan */
def testOutputMode(
outputMode: OutputMode,
- shouldSupportAggregation: Boolean): Unit = {
+ shouldSupportAggregation: Boolean,
+ shouldSupportNonAggregation: Boolean): Unit = {
// aggregation
if (shouldSupportAggregation) {
- assertNotSupportedInStreamingPlan(
- s"$outputMode output mode - no aggregation",
- streamRelation.where($"a" > 1),
- outputMode = outputMode,
- Seq("aggregation", s"$outputMode output mode"))
-
assertSupportedInStreamingPlan(
s"$outputMode output mode - aggregation",
streamRelation.groupBy("a")("count(*)"),
outputMode = outputMode)
-
} else {
+ assertNotSupportedInStreamingPlan(
+ s"$outputMode output mode - aggregation",
+ streamRelation.groupBy("a")("count(*)"),
+ outputMode = outputMode,
+ Seq("aggregation", s"$outputMode output mode"))
+ }
+
+ // non aggregation
+ if (shouldSupportNonAggregation) {
assertSupportedInStreamingPlan(
s"$outputMode output mode - no aggregation",
streamRelation.where($"a" > 1),
outputMode = outputMode)
-
+ } else {
assertNotSupportedInStreamingPlan(
- s"$outputMode output mode - aggregation",
- streamRelation.groupBy("a")("count(*)"),
+ s"$outputMode output mode - no aggregation",
+ streamRelation.where($"a" > 1),
outputMode = outputMode,
Seq("aggregation", s"$outputMode output mode"))
}