aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-05-31 15:57:01 -0700
committerMichael Armbrust <michael@databricks.com>2016-05-31 15:57:01 -0700
commit90b11439b3d4540f48985e87dcc99749f0369287 (patch)
treedeab5a578c9fa2044764c2e8c0b34d1a6bfdbef9 /sql/catalyst/src/test
parentdfe2cbeb437a4fa69bec3eca4ac9242f3eb51c81 (diff)
downloadspark-90b11439b3d4540f48985e87dcc99749f0369287.tar.gz
spark-90b11439b3d4540f48985e87dcc99749f0369287.tar.bz2
spark-90b11439b3d4540f48985e87dcc99749f0369287.zip
[SPARK-15517][SQL][STREAMING] Add support for complete output mode in Structure Streaming
## What changes were proposed in this pull request? Currently structured streaming only supports append output mode. This PR adds the following. - Added support for Complete output mode in the internal state store, analyzer and planner. - Added public API in Scala and Python for users to specify output mode - Added checks for unsupported combinations of output mode and DF operations - Plans with no aggregation should support only Append mode - Plans with aggregation should support only Update and Complete modes - Default output mode is Append mode (**Question: should we change this to automatically set to Complete mode when there is aggregation?**) - Added support for Complete output mode in Memory Sink. So Memory Sink internally supports append and complete, update. But from public API only Complete and Append output modes are supported. ## How was this patch tested? Unit tests in various test suites - StreamingAggregationSuite: tests for complete mode - MemorySinkSuite: tests for checking behavior in Append and Complete modes. - UnsupportedOperationSuite: tests for checking unsupported combinations of DF ops and output modes - DataFrameReaderWriterSuite: tests for checking that output mode cannot be called on static DFs - Python doc test and existing unit tests modified to call write.outputMode. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #13286 from tdas/complete-mode.
Diffstat (limited to 'sql/catalyst/src/test')
-rw-r--r--sql/catalyst/src/test/java/org/apache/spark/sql/JavaOutputModeSuite.java31
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala63
2 files changed, 69 insertions, 25 deletions
diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/JavaOutputModeSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/JavaOutputModeSuite.java
new file mode 100644
index 0000000000..1764f3348d
--- /dev/null
+++ b/sql/catalyst/src/test/java/org/apache/spark/sql/JavaOutputModeSuite.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql;
+
+import org.junit.Test;
+
+public class JavaOutputModeSuite {
+
+ @Test
+ public void testOutputModes() {
+ OutputMode o1 = OutputMode.Append();
+ assert(o1.toString().toLowerCase().contains("append"));
+ OutputMode o2 = OutputMode.Complete();
+ assert (o2.toString().toLowerCase().contains("complete"));
+ }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index aaeee0f2a4..c2e3d47450 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -18,7 +18,8 @@
package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.{AnalysisException, OutputMode}
+import org.apache.spark.sql.InternalOutputModes._
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
@@ -79,35 +80,13 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
outputMode = Append,
expectedMsgs = "commands" :: Nil)
- // Aggregates: Not supported on streams in Append mode
- assertSupportedInStreamingPlan(
- "aggregate - batch with update output mode",
- batchRelation.groupBy("a")("count(*)"),
- outputMode = Update)
-
- assertSupportedInStreamingPlan(
- "aggregate - batch with append output mode",
- batchRelation.groupBy("a")("count(*)"),
- outputMode = Append)
-
- assertSupportedInStreamingPlan(
- "aggregate - stream with update output mode",
- streamRelation.groupBy("a")("count(*)"),
- outputMode = Update)
-
- assertNotSupportedInStreamingPlan(
- "aggregate - stream with append output mode",
- streamRelation.groupBy("a")("count(*)"),
- outputMode = Append,
- Seq("aggregation", "append output mode"))
-
// Multiple streaming aggregations not supported
def aggExprs(name: String): Seq[NamedExpression] = Seq(Count("*").as(name))
assertSupportedInStreamingPlan(
"aggregate - multiple batch aggregations",
Aggregate(Nil, aggExprs("c"), Aggregate(Nil, aggExprs("d"), batchRelation)),
- Update)
+ Append)
assertSupportedInStreamingPlan(
"aggregate - multiple aggregations but only one streaming aggregation",
@@ -209,7 +188,6 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
_.intersect(_),
streamStreamSupported = false)
-
// Unary operations
testUnaryOperatorInStreamingPlan("sort", Sort(Nil, true, _))
testUnaryOperatorInStreamingPlan("sort partitions", SortPartitions(Nil, _), expectedMsg = "sort")
@@ -218,6 +196,10 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
testUnaryOperatorInStreamingPlan(
"window", Window(Nil, Nil, Nil, _), expectedMsg = "non-time-based windows")
+ // Output modes with aggregation and non-aggregation plans
+ testOutputMode(Append, shouldSupportAggregation = false)
+ testOutputMode(Update, shouldSupportAggregation = true)
+ testOutputMode(Complete, shouldSupportAggregation = true)
/*
=======================================================================================
@@ -316,6 +298,37 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
outputMode)
}
+ def testOutputMode(
+ outputMode: OutputMode,
+ shouldSupportAggregation: Boolean): Unit = {
+
+ // aggregation
+ if (shouldSupportAggregation) {
+ assertNotSupportedInStreamingPlan(
+ s"$outputMode output mode - no aggregation",
+ streamRelation.where($"a" > 1),
+ outputMode = outputMode,
+ Seq("aggregation", s"$outputMode output mode"))
+
+ assertSupportedInStreamingPlan(
+ s"$outputMode output mode - aggregation",
+ streamRelation.groupBy("a")("count(*)"),
+ outputMode = outputMode)
+
+ } else {
+ assertSupportedInStreamingPlan(
+ s"$outputMode output mode - no aggregation",
+ streamRelation.where($"a" > 1),
+ outputMode = outputMode)
+
+ assertNotSupportedInStreamingPlan(
+ s"$outputMode output mode - aggregation",
+ streamRelation.groupBy("a")("count(*)"),
+ outputMode = outputMode,
+ Seq("aggregation", s"$outputMode output mode"))
+ }
+ }
+
/**
* Assert that the logical plan is supported as subplan insider a streaming plan.
*