aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-05-31 15:57:01 -0700
committerMichael Armbrust <michael@databricks.com>2016-05-31 15:57:01 -0700
commit90b11439b3d4540f48985e87dcc99749f0369287 (patch)
treedeab5a578c9fa2044764c2e8c0b34d1a6bfdbef9 /sql/catalyst
parentdfe2cbeb437a4fa69bec3eca4ac9242f3eb51c81 (diff)
downloadspark-90b11439b3d4540f48985e87dcc99749f0369287.tar.gz
spark-90b11439b3d4540f48985e87dcc99749f0369287.tar.bz2
spark-90b11439b3d4540f48985e87dcc99749f0369287.zip
[SPARK-15517][SQL][STREAMING] Add support for complete output mode in Structure Streaming
## What changes were proposed in this pull request? Currently structured streaming only supports append output mode. This PR adds the following. - Added support for Complete output mode in the internal state store, analyzer and planner. - Added public API in Scala and Python for users to specify output mode - Added checks for unsupported combinations of output mode and DF operations - Plans with no aggregation should support only Append mode - Plans with aggregation should support only Update and Complete modes - Default output mode is Append mode (**Question: should we change this to automatically set to Complete mode when there is aggregation?**) - Added support for Complete output mode in Memory Sink. So Memory Sink internally supports append and complete, update. But from public API only Complete and Append output modes are supported. ## How was this patch tested? Unit tests in various test suites - StreamingAggregationSuite: tests for complete mode - MemorySinkSuite: tests for checking behavior in Append and Complete modes. - UnsupportedOperationSuite: tests for checking unsupported combinations of DF ops and output modes - DataFrameReaderWriterSuite: tests for checking that output mode cannot be called on static DFs - Python doc test and existing unit tests modified to call write.outputMode. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #13286 from tdas/complete-mode.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/OutputMode.java54
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala45
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala49
-rw-r--r--sql/catalyst/src/test/java/org/apache/spark/sql/JavaOutputModeSuite.java (renamed from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/OutputMode.scala)16
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala63
5 files changed, 176 insertions, 51 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/OutputMode.java b/sql/catalyst/src/main/java/org/apache/spark/sql/OutputMode.java
new file mode 100644
index 0000000000..1936d53e5e
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/OutputMode.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql;
+
+import org.apache.spark.annotation.Experimental;
+
+/**
+ * :: Experimental ::
+ *
+ * OutputMode is used to what data will be written to a streaming sink when there is
+ * new data available in a streaming DataFrame/Dataset.
+ *
+ * @since 2.0.0
+ */
+@Experimental
+public class OutputMode {
+
+ /**
+ * OutputMode in which only the new rows in the streaming DataFrame/Dataset will be
+ * written to the sink. This output mode can be only be used in queries that do not
+ * contain any aggregation.
+ *
+ * @since 2.0.0
+ */
+ public static OutputMode Append() {
+ return InternalOutputModes.Append$.MODULE$;
+ }
+
+ /**
+ * OutputMode in which all the rows in the streaming DataFrame/Dataset will be written
+ * to the sink every time these is some updates. This output mode can only be used in queries
+ * that contain aggregations.
+ *
+ * @since 2.0.0
+ */
+ public static OutputMode Complete() {
+ return InternalOutputModes.Complete$.MODULE$;
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala
new file mode 100644
index 0000000000..8ef5d9a653
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+/**
+ * Internal helper class to generate objects representing various [[OutputMode]]s,
+ */
+private[sql] object InternalOutputModes {
+
+ /**
+ * OutputMode in which only the new rows in the streaming DataFrame/Dataset will be
+ * written to the sink. This output mode can be only be used in queries that do not
+ * contain any aggregation.
+ */
+ case object Append extends OutputMode
+
+ /**
+ * OutputMode in which all the rows in the streaming DataFrame/Dataset will be written
+ * to the sink every time these is some updates. This output mode can only be used in queries
+ * that contain aggregations.
+ */
+ case object Complete extends OutputMode
+
+ /**
+ * OutputMode in which only the rows in the streaming DataFrame/Dataset that were updated will be
+ * written to the sink every time these is some updates. This output mode can only be used in
+ * queries that contain aggregations.
+ */
+ case object Update extends OutputMode
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index 0e08bf013c..f4c0347609 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.analysis
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.{AnalysisException, InternalOutputModes, OutputMode}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
@@ -29,8 +29,7 @@ object UnsupportedOperationChecker {
def checkForBatch(plan: LogicalPlan): Unit = {
plan.foreachUp {
case p if p.isStreaming =>
- throwError(
- "Queries with streaming sources must be executed with write.startStream()")(p)
+ throwError("Queries with streaming sources must be executed with write.startStream()")(p)
case _ =>
}
@@ -43,10 +42,10 @@ object UnsupportedOperationChecker {
"Queries without streaming sources cannot be executed with write.startStream()")(plan)
}
- plan.foreachUp { implicit plan =>
+ plan.foreachUp { implicit subPlan =>
// Operations that cannot exists anywhere in a streaming plan
- plan match {
+ subPlan match {
case _: Command =>
throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " +
@@ -55,21 +54,6 @@ object UnsupportedOperationChecker {
case _: InsertIntoTable =>
throwError("InsertIntoTable is not supported with streaming DataFrames/Datasets")
- case Aggregate(_, _, child) if child.isStreaming =>
- if (outputMode == Append) {
- throwError(
- "Aggregations are not supported on streaming DataFrames/Datasets in " +
- "Append output mode. Consider changing output mode to Update.")
- }
- val moreStreamingAggregates = child.find {
- case Aggregate(_, _, grandchild) if grandchild.isStreaming => true
- case _ => false
- }
- if (moreStreamingAggregates.nonEmpty) {
- throwError("Multiple streaming aggregations are not supported with " +
- "streaming DataFrames/Datasets")
- }
-
case Join(left, right, joinType, _) =>
joinType match {
@@ -119,10 +103,10 @@ object UnsupportedOperationChecker {
case GroupingSets(_, _, child, _) if child.isStreaming =>
throwError("GroupingSets is not supported on streaming DataFrames/Datasets")
- case GlobalLimit(_, _) | LocalLimit(_, _) if plan.children.forall(_.isStreaming) =>
+ case GlobalLimit(_, _) | LocalLimit(_, _) if subPlan.children.forall(_.isStreaming) =>
throwError("Limits are not supported on streaming DataFrames/Datasets")
- case Sort(_, _, _) | SortPartitions(_, _) if plan.children.forall(_.isStreaming) =>
+ case Sort(_, _, _) | SortPartitions(_, _) if subPlan.children.forall(_.isStreaming) =>
throwError("Sorting is not supported on streaming DataFrames/Datasets")
case Sample(_, _, _, _, child) if child.isStreaming =>
@@ -138,6 +122,27 @@ object UnsupportedOperationChecker {
case _ =>
}
}
+
+ // Checks related to aggregations
+ val aggregates = plan.collect { case a @ Aggregate(_, _, _) if a.isStreaming => a }
+ outputMode match {
+ case InternalOutputModes.Append if aggregates.nonEmpty =>
+ throwError(
+ s"$outputMode output mode not supported when there are streaming aggregations on " +
+ s"streaming DataFrames/DataSets")(plan)
+
+ case InternalOutputModes.Complete | InternalOutputModes.Update if aggregates.isEmpty =>
+ throwError(
+ s"$outputMode output mode not supported when there are no streaming aggregations on " +
+ s"streaming DataFrames/Datasets")(plan)
+
+ case _ =>
+ }
+ if (aggregates.size > 1) {
+ throwError(
+ "Multiple streaming aggregations are not supported with " +
+ "streaming DataFrames/Datasets")(plan)
+ }
}
private def throwErrorIf(
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/OutputMode.scala b/sql/catalyst/src/test/java/org/apache/spark/sql/JavaOutputModeSuite.java
index a4d387eae3..1764f3348d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/OutputMode.scala
+++ b/sql/catalyst/src/test/java/org/apache/spark/sql/JavaOutputModeSuite.java
@@ -15,9 +15,17 @@
* limitations under the License.
*/
-package org.apache.spark.sql.catalyst.analysis
+package org.apache.spark.sql;
-sealed trait OutputMode
+import org.junit.Test;
-case object Append extends OutputMode
-case object Update extends OutputMode
+public class JavaOutputModeSuite {
+
+ @Test
+ public void testOutputModes() {
+ OutputMode o1 = OutputMode.Append();
+ assert(o1.toString().toLowerCase().contains("append"));
+ OutputMode o2 = OutputMode.Complete();
+ assert (o2.toString().toLowerCase().contains("complete"));
+ }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index aaeee0f2a4..c2e3d47450 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -18,7 +18,8 @@
package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.{AnalysisException, OutputMode}
+import org.apache.spark.sql.InternalOutputModes._
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
@@ -79,35 +80,13 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
outputMode = Append,
expectedMsgs = "commands" :: Nil)
- // Aggregates: Not supported on streams in Append mode
- assertSupportedInStreamingPlan(
- "aggregate - batch with update output mode",
- batchRelation.groupBy("a")("count(*)"),
- outputMode = Update)
-
- assertSupportedInStreamingPlan(
- "aggregate - batch with append output mode",
- batchRelation.groupBy("a")("count(*)"),
- outputMode = Append)
-
- assertSupportedInStreamingPlan(
- "aggregate - stream with update output mode",
- streamRelation.groupBy("a")("count(*)"),
- outputMode = Update)
-
- assertNotSupportedInStreamingPlan(
- "aggregate - stream with append output mode",
- streamRelation.groupBy("a")("count(*)"),
- outputMode = Append,
- Seq("aggregation", "append output mode"))
-
// Multiple streaming aggregations not supported
def aggExprs(name: String): Seq[NamedExpression] = Seq(Count("*").as(name))
assertSupportedInStreamingPlan(
"aggregate - multiple batch aggregations",
Aggregate(Nil, aggExprs("c"), Aggregate(Nil, aggExprs("d"), batchRelation)),
- Update)
+ Append)
assertSupportedInStreamingPlan(
"aggregate - multiple aggregations but only one streaming aggregation",
@@ -209,7 +188,6 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
_.intersect(_),
streamStreamSupported = false)
-
// Unary operations
testUnaryOperatorInStreamingPlan("sort", Sort(Nil, true, _))
testUnaryOperatorInStreamingPlan("sort partitions", SortPartitions(Nil, _), expectedMsg = "sort")
@@ -218,6 +196,10 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
testUnaryOperatorInStreamingPlan(
"window", Window(Nil, Nil, Nil, _), expectedMsg = "non-time-based windows")
+ // Output modes with aggregation and non-aggregation plans
+ testOutputMode(Append, shouldSupportAggregation = false)
+ testOutputMode(Update, shouldSupportAggregation = true)
+ testOutputMode(Complete, shouldSupportAggregation = true)
/*
=======================================================================================
@@ -316,6 +298,37 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
outputMode)
}
+ def testOutputMode(
+ outputMode: OutputMode,
+ shouldSupportAggregation: Boolean): Unit = {
+
+ // aggregation
+ if (shouldSupportAggregation) {
+ assertNotSupportedInStreamingPlan(
+ s"$outputMode output mode - no aggregation",
+ streamRelation.where($"a" > 1),
+ outputMode = outputMode,
+ Seq("aggregation", s"$outputMode output mode"))
+
+ assertSupportedInStreamingPlan(
+ s"$outputMode output mode - aggregation",
+ streamRelation.groupBy("a")("count(*)"),
+ outputMode = outputMode)
+
+ } else {
+ assertSupportedInStreamingPlan(
+ s"$outputMode output mode - no aggregation",
+ streamRelation.where($"a" > 1),
+ outputMode = outputMode)
+
+ assertNotSupportedInStreamingPlan(
+ s"$outputMode output mode - aggregation",
+ streamRelation.groupBy("a")("count(*)"),
+ outputMode = outputMode,
+ Seq("aggregation", s"$outputMode output mode"))
+ }
+ }
+
/**
* Assert that the logical plan is supported as subplan insider a streaming plan.
*