aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/OutputMode.scala23
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala145
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala18
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala3
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala384
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala21
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala34
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala3
16 files changed, 685 insertions, 12 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/OutputMode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/OutputMode.scala
new file mode 100644
index 0000000000..a4d387eae3
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/OutputMode.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+sealed trait OutputMode
+
+case object Append extends OutputMode
+case object Update extends OutputMode
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
new file mode 100644
index 0000000000..aadc1d31bd
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical._
+
+/**
+ * Analyzes the presence of unsupported operations in a logical plan.
+ */
+object UnsupportedOperationChecker {
+
+ def checkForBatch(plan: LogicalPlan): Unit = {
+ plan.foreachUp {
+ case p if p.isStreaming =>
+ throwError(
+ "Queries with streaming sources must be executed with write.startStream()")(p)
+
+ case _ =>
+ }
+ }
+
+ def checkForStreaming(plan: LogicalPlan, outputMode: OutputMode): Unit = {
+
+ if (!plan.isStreaming) {
+ throwError(
+ "Queries without streaming sources cannot be executed with write.startStream()")(plan)
+ }
+
+ plan.foreachUp { implicit plan =>
+
+ // Operations that cannot exists anywhere in a streaming plan
+ plan match {
+
+ case _: Command =>
+ throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " +
+ "streaming DataFrames/Datasets")
+
+ case _: InsertIntoTable =>
+ throwError("InsertIntoTable is not supported with streaming DataFrames/Datasets")
+
+ case Aggregate(_, _, child) if child.isStreaming && outputMode == Append =>
+ throwError(
+ "Aggregations are not supported on streaming DataFrames/Datasets in " +
+ "Append output mode. Consider changing output mode to Update.")
+
+ case Join(left, right, joinType, _) =>
+
+ joinType match {
+
+ case Inner =>
+ if (left.isStreaming && right.isStreaming) {
+ throwError("Inner join between two streaming DataFrames/Datasets is not supported")
+ }
+
+ case FullOuter =>
+ if (left.isStreaming || right.isStreaming) {
+ throwError("Full outer joins with streaming DataFrames/Datasets are not supported")
+ }
+
+
+ case LeftOuter | LeftSemi | LeftAnti =>
+ if (right.isStreaming) {
+ throwError("Left outer/semi/anti joins with a streaming DataFrame/Dataset " +
+ "on the right is not supported")
+ }
+
+ case RightOuter =>
+ if (left.isStreaming) {
+ throwError("Right outer join with a streaming DataFrame/Dataset on the left is " +
+ "not supported")
+ }
+
+ case NaturalJoin(_) | UsingJoin(_, _) =>
+ // They should not appear in an analyzed plan.
+
+ case _ =>
+ throwError(s"Join type $joinType is not supported with streaming DataFrame/Dataset")
+ }
+
+ case c: CoGroup if c.children.exists(_.isStreaming) =>
+ throwError("CoGrouping with a streaming DataFrame/Dataset is not supported")
+
+ case u: Union if u.children.map(_.isStreaming).distinct.size == 2 =>
+ throwError("Union between streaming and batch DataFrames/Datasets is not supported")
+
+ case Except(left, right) if right.isStreaming =>
+ throwError("Except with a streaming DataFrame/Dataset on the right is not supported")
+
+ case Intersect(left, right) if left.isStreaming && right.isStreaming =>
+ throwError("Intersect between two streaming DataFrames/Datasets is not supported")
+
+ case GroupingSets(_, _, child, _) if child.isStreaming =>
+ throwError("GroupingSets is not supported on streaming DataFrames/Datasets")
+
+ case GlobalLimit(_, _) | LocalLimit(_, _) if plan.children.forall(_.isStreaming) =>
+ throwError("Limits are not supported on streaming DataFrames/Datasets")
+
+ case Sort(_, _, _) | SortPartitions(_, _) if plan.children.forall(_.isStreaming) =>
+ throwError("Sorting is not supported on streaming DataFrames/Datasets")
+
+ case Sample(_, _, _, _, child) if child.isStreaming =>
+ throwError("Sampling is not supported on streaming DataFrames/Datasets")
+
+ case Window(_, _, _, child) if child.isStreaming =>
+ throwError("Non-time-based windows are not supported on streaming DataFrames/Datasets")
+
+ case ReturnAnswer(child) if child.isStreaming =>
+ throwError("Cannot return immediate result on streaming DataFrames/Dataset. Queries " +
+ "with streaming DataFrames/Datasets must be executed with write.startStream().")
+
+ case _ =>
+ }
+ }
+ }
+
+ private def throwErrorIf(
+ condition: Boolean,
+ msg: String)(implicit operator: LogicalPlan): Unit = {
+ if (condition) {
+ throwError(msg)
+ }
+ }
+
+ private def throwError(msg: String)(implicit operator: LogicalPlan): Nothing = {
+ throw new AnalysisException(
+ msg, operator.origin.line, operator.origin.startPosition, Some(operator))
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 1e7296664b..958966328b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -297,6 +297,24 @@ package object dsl {
condition: Option[Expression] = None): LogicalPlan =
Join(logicalPlan, otherPlan, joinType, condition)
+ def cogroup[Key: Encoder, Left: Encoder, Right: Encoder, Result: Encoder](
+ otherPlan: LogicalPlan,
+ func: (Key, Iterator[Left], Iterator[Right]) => TraversableOnce[Result],
+ leftGroup: Seq[Attribute],
+ rightGroup: Seq[Attribute],
+ leftAttr: Seq[Attribute],
+ rightAttr: Seq[Attribute]
+ ): LogicalPlan = {
+ CoGroup.apply[Key, Left, Right, Result](
+ func,
+ leftGroup,
+ rightGroup,
+ leftAttr,
+ rightAttr,
+ logicalPlan,
+ otherPlan)
+ }
+
def orderBy(sortExprs: SortOrder*): LogicalPlan = Sort(sortExprs, true, logicalPlan)
def sortBy(sortExprs: SortOrder*): LogicalPlan = Sort(sortExprs, false, logicalPlan)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index aceeb8aadc..45ac126a72 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -42,6 +42,9 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
*/
def analyzed: Boolean = _analyzed
+ /** Returns true if this subtree contains any streaming data sources. */
+ def isStreaming: Boolean = children.exists(_.isStreaming == true)
+
/**
* Returns a copy of this node where `rule` has been recursively applied first to all of its
* children and then itself (post-order). When `rule` does not apply to a given node, it is left
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
new file mode 100644
index 0000000000..ce00a03e76
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types.IntegerType
+
+class UnsupportedOperationsSuite extends SparkFunSuite {
+
+ val attribute = AttributeReference("a", IntegerType, nullable = true)()
+ val batchRelation = LocalRelation(attribute)
+ val streamRelation = new TestStreamingRelation(attribute)
+
+ /*
+ =======================================================================================
+ BATCH QUERIES
+ =======================================================================================
+ */
+
+ assertSupportedInBatchPlan("local relation", batchRelation)
+
+ assertNotSupportedInBatchPlan(
+ "streaming source",
+ streamRelation,
+ Seq("with streaming source", "startStream"))
+
+ assertNotSupportedInBatchPlan(
+ "select on streaming source",
+ streamRelation.select($"count(*)"),
+ Seq("with streaming source", "startStream"))
+
+
+ /*
+ =======================================================================================
+ STREAMING QUERIES
+ =======================================================================================
+ */
+
+ // Batch plan in streaming query
+ testError(
+ "streaming plan - no streaming source",
+ Seq("without streaming source", "startStream")) {
+ UnsupportedOperationChecker.checkForStreaming(batchRelation.select($"count(*)"), Append)
+ }
+
+ // Commands
+ assertNotSupportedInStreamingPlan(
+ "commmands",
+ DescribeFunction("func", true),
+ outputMode = Append,
+ expectedMsgs = "commands" :: Nil)
+
+ // Aggregates: Not supported on streams in Append mode
+ assertSupportedInStreamingPlan(
+ "aggregate - batch with update output mode",
+ batchRelation.groupBy("a")("count(*)"),
+ outputMode = Update)
+
+ assertSupportedInStreamingPlan(
+ "aggregate - batch with append output mode",
+ batchRelation.groupBy("a")("count(*)"),
+ outputMode = Append)
+
+ assertSupportedInStreamingPlan(
+ "aggregate - stream with update output mode",
+ streamRelation.groupBy("a")("count(*)"),
+ outputMode = Update)
+
+ assertNotSupportedInStreamingPlan(
+ "aggregate - stream with append output mode",
+ streamRelation.groupBy("a")("count(*)"),
+ outputMode = Append,
+ Seq("aggregation", "append output mode"))
+
+ // Inner joins: Stream-stream not supported
+ testBinaryOperationInStreamingPlan(
+ "inner join",
+ _.join(_, joinType = Inner),
+ streamStreamSupported = false)
+
+ // Full outer joins: only batch-batch is allowed
+ testBinaryOperationInStreamingPlan(
+ "full outer join",
+ _.join(_, joinType = FullOuter),
+ streamStreamSupported = false,
+ batchStreamSupported = false,
+ streamBatchSupported = false)
+
+ // Left outer joins: *-stream not allowed
+ testBinaryOperationInStreamingPlan(
+ "left outer join",
+ _.join(_, joinType = LeftOuter),
+ streamStreamSupported = false,
+ batchStreamSupported = false,
+ expectedMsg = "left outer/semi/anti joins")
+
+ // Left semi joins: stream-* not allowed
+ testBinaryOperationInStreamingPlan(
+ "left semi join",
+ _.join(_, joinType = LeftSemi),
+ streamStreamSupported = false,
+ batchStreamSupported = false,
+ expectedMsg = "left outer/semi/anti joins")
+
+ // Left anti joins: stream-* not allowed
+ testBinaryOperationInStreamingPlan(
+ "left anti join",
+ _.join(_, joinType = LeftAnti),
+ streamStreamSupported = false,
+ batchStreamSupported = false,
+ expectedMsg = "left outer/semi/anti joins")
+
+ // Right outer joins: stream-* not allowed
+ testBinaryOperationInStreamingPlan(
+ "right outer join",
+ _.join(_, joinType = RightOuter),
+ streamStreamSupported = false,
+ streamBatchSupported = false)
+
+ // Cogroup: only batch-batch is allowed
+ testBinaryOperationInStreamingPlan(
+ "cogroup",
+ genCogroup,
+ streamStreamSupported = false,
+ batchStreamSupported = false,
+ streamBatchSupported = false)
+
+ def genCogroup(left: LogicalPlan, right: LogicalPlan): LogicalPlan = {
+ def func(k: Int, left: Iterator[Int], right: Iterator[Int]): Iterator[Int] = {
+ Iterator.empty
+ }
+ implicit val intEncoder = ExpressionEncoder[Int]
+
+ left.cogroup[Int, Int, Int, Int](
+ right,
+ func,
+ AppendColumns[Int, Int]((x: Int) => x, left).newColumns,
+ AppendColumns[Int, Int]((x: Int) => x, right).newColumns,
+ left.output,
+ right.output)
+ }
+
+ // Union: Mixing between stream and batch not supported
+ testBinaryOperationInStreamingPlan(
+ "union",
+ _.union(_),
+ streamBatchSupported = false,
+ batchStreamSupported = false)
+
+ // Except: *-stream not supported
+ testBinaryOperationInStreamingPlan(
+ "except",
+ _.except(_),
+ streamStreamSupported = false,
+ batchStreamSupported = false)
+
+ // Intersect: stream-stream not supported
+ testBinaryOperationInStreamingPlan(
+ "intersect",
+ _.intersect(_),
+ streamStreamSupported = false)
+
+
+ // Unary operations
+ testUnaryOperatorInStreamingPlan("sort", Sort(Nil, true, _))
+ testUnaryOperatorInStreamingPlan("sort partitions", SortPartitions(Nil, _), expectedMsg = "sort")
+ testUnaryOperatorInStreamingPlan(
+ "sample", Sample(0.1, 1, true, 1L, _)(), expectedMsg = "sampling")
+ testUnaryOperatorInStreamingPlan(
+ "window", Window(Nil, Nil, Nil, _), expectedMsg = "non-time-based windows")
+
+
+ /*
+ =======================================================================================
+ TESTING FUNCTIONS
+ =======================================================================================
+ */
+
+ /**
+ * Test that an unary operator correctly fails support check when it has a streaming child plan,
+ * but not when it has batch child plan. There can be batch sub-plans inside a streaming plan,
+ * so it is valid for the operator to have a batch child plan.
+ *
+ * This test wraps the logical plan in a fake operator that makes the whole plan look like
+ * a streaming plan even if the child plan is a batch plan. This is to test that the operator
+ * supports having a batch child plan, forming a batch subplan inside a streaming plan.
+ */
+ def testUnaryOperatorInStreamingPlan(
+ operationName: String,
+ logicalPlanGenerator: LogicalPlan => LogicalPlan,
+ outputMode: OutputMode = Append,
+ expectedMsg: String = ""): Unit = {
+
+ val expectedMsgs = if (expectedMsg.isEmpty) Seq(operationName) else Seq(expectedMsg)
+
+ assertNotSupportedInStreamingPlan(
+ s"$operationName with stream relation",
+ wrapInStreaming(logicalPlanGenerator(streamRelation)),
+ outputMode,
+ expectedMsgs)
+
+ assertSupportedInStreamingPlan(
+ s"$operationName with batch relation",
+ wrapInStreaming(logicalPlanGenerator(batchRelation)),
+ outputMode)
+ }
+
+
+ /**
+ * Test that a binary operator correctly fails support check when it has combinations of
+ * streaming and batch child plans. There can be batch sub-plans inside a streaming plan,
+ * so it is valid for the operator to have a batch child plan.
+ */
+ def testBinaryOperationInStreamingPlan(
+ operationName: String,
+ planGenerator: (LogicalPlan, LogicalPlan) => LogicalPlan,
+ outputMode: OutputMode = Append,
+ streamStreamSupported: Boolean = true,
+ streamBatchSupported: Boolean = true,
+ batchStreamSupported: Boolean = true,
+ expectedMsg: String = ""): Unit = {
+
+ val expectedMsgs = if (expectedMsg.isEmpty) Seq(operationName) else Seq(expectedMsg)
+
+ if (streamStreamSupported) {
+ assertSupportedInStreamingPlan(
+ s"$operationName with stream-stream relations",
+ planGenerator(streamRelation, streamRelation),
+ outputMode)
+ } else {
+ assertNotSupportedInStreamingPlan(
+ s"$operationName with stream-stream relations",
+ planGenerator(streamRelation, streamRelation),
+ outputMode,
+ expectedMsgs)
+ }
+
+ if (streamBatchSupported) {
+ assertSupportedInStreamingPlan(
+ s"$operationName with stream-batch relations",
+ planGenerator(streamRelation, batchRelation),
+ outputMode)
+ } else {
+ assertNotSupportedInStreamingPlan(
+ s"$operationName with stream-batch relations",
+ planGenerator(streamRelation, batchRelation),
+ outputMode,
+ expectedMsgs)
+ }
+
+ if (batchStreamSupported) {
+ assertSupportedInStreamingPlan(
+ s"$operationName with batch-stream relations",
+ planGenerator(batchRelation, streamRelation),
+ outputMode)
+ } else {
+ assertNotSupportedInStreamingPlan(
+ s"$operationName with batch-stream relations",
+ planGenerator(batchRelation, streamRelation),
+ outputMode,
+ expectedMsgs)
+ }
+
+ assertSupportedInStreamingPlan(
+ s"$operationName with batch-batch relations",
+ planGenerator(batchRelation, batchRelation),
+ outputMode)
+ }
+
+ /**
+ * Assert that the logical plan is supported as subplan insider a streaming plan.
+ *
+ * To test this correctly, the given logical plan is wrapped in a fake operator that makes the
+ * whole plan look like a streaming plan. Otherwise, a batch plan may throw not supported
+ * exception simply for not being a streaming plan, even though that plan could exists as batch
+ * subplan inside some streaming plan.
+ */
+ def assertSupportedInStreamingPlan(
+ name: String,
+ plan: LogicalPlan,
+ outputMode: OutputMode): Unit = {
+ test(s"streaming plan - $name: supported") {
+ UnsupportedOperationChecker.checkForStreaming(wrapInStreaming(plan), outputMode)
+ }
+ }
+
+ /**
+ * Assert that the logical plan is not supported inside a streaming plan.
+ *
+ * To test this correctly, the given logical plan is wrapped in a fake operator that makes the
+ * whole plan look like a streaming plan. Otherwise, a batch plan may throw not supported
+ * exception simply for not being a streaming plan, even though that plan could exists as batch
+ * subplan inside some streaming plan.
+ */
+ def assertNotSupportedInStreamingPlan(
+ name: String,
+ plan: LogicalPlan,
+ outputMode: OutputMode,
+ expectedMsgs: Seq[String]): Unit = {
+ testError(
+ s"streaming plan - $name: not supported",
+ expectedMsgs :+ "streaming" :+ "DataFrame" :+ "Dataset" :+ "not supported") {
+ UnsupportedOperationChecker.checkForStreaming(wrapInStreaming(plan), outputMode)
+ }
+ }
+
+ /** Assert that the logical plan is supported as a batch plan */
+ def assertSupportedInBatchPlan(name: String, plan: LogicalPlan): Unit = {
+ test(s"batch plan - $name: supported") {
+ UnsupportedOperationChecker.checkForBatch(plan)
+ }
+ }
+
+ /** Assert that the logical plan is not supported as a batch plan */
+ def assertNotSupportedInBatchPlan(
+ name: String,
+ plan: LogicalPlan,
+ expectedMsgs: Seq[String]): Unit = {
+ testError(s"batch plan - $name: not supported", expectedMsgs) {
+ UnsupportedOperationChecker.checkForBatch(plan)
+ }
+ }
+
+ /**
+ * Test whether the body of code will fail. If it does fail, then check if it has expected
+ * messages.
+ */
+ def testError(testName: String, expectedMsgs: Seq[String])(testBody: => Unit): Unit = {
+
+ test(testName) {
+ val e = intercept[AnalysisException] {
+ testBody
+ }
+
+ if (!expectedMsgs.map(_.toLowerCase).forall(e.getMessage.toLowerCase.contains)) {
+ fail(
+ s"""Exception message should contain the following substrings:
+ |
+ | ${expectedMsgs.mkString("\n ")}
+ |
+ |Actual exception message:
+ |
+ | ${e.getMessage}
+ """.stripMargin)
+ }
+ }
+ }
+
+ def wrapInStreaming(plan: LogicalPlan): LogicalPlan = {
+ new StreamingPlanWrapper(plan)
+ }
+
+ case class StreamingPlanWrapper(child: LogicalPlan) extends UnaryNode {
+ override def output: Seq[Attribute] = child.output
+ override def isStreaming: Boolean = true
+ }
+
+ case class TestStreamingRelation(output: Seq[Attribute]) extends LeafNode {
+ def this(attribute: Attribute) = this(Seq(attribute))
+ override def isStreaming: Boolean = true
+ }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
index faef9ed274..cc86f1f6e2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
@@ -18,7 +18,9 @@
package org.apache.spark.sql.catalyst.plans
import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types.IntegerType
/**
* This suite is used to test [[LogicalPlan]]'s `resolveOperators` and make sure it can correctly
@@ -68,4 +70,23 @@ class LogicalPlanSuite extends SparkFunSuite {
assert(invocationCount === 1)
}
+
+ test("isStreaming") {
+ val relation = LocalRelation(AttributeReference("a", IntegerType, nullable = true)())
+ val incrementalRelation = new LocalRelation(
+ Seq(AttributeReference("a", IntegerType, nullable = true)())) {
+ override def isStreaming(): Boolean = true
+ }
+
+ case class TestBinaryRelation(left: LogicalPlan, right: LogicalPlan) extends BinaryNode {
+ override def output: Seq[Attribute] = left.output ++ right.output
+ }
+
+ require(relation.isStreaming === false)
+ require(incrementalRelation.isStreaming === true)
+ assert(TestBinaryRelation(relation, relation).isStreaming === false)
+ assert(TestBinaryRelation(incrementalRelation, relation).isStreaming === true)
+ assert(TestBinaryRelation(relation, incrementalRelation).isStreaming === true)
+ assert(TestBinaryRelation(incrementalRelation, incrementalRelation).isStreaming)
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
index 1343e81569..39d04ed8c2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
@@ -20,8 +20,10 @@ package org.apache.spark.sql
import scala.collection.mutable
import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.catalyst.analysis.{Append, OutputMode, UnsupportedOperationChecker}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.util.ContinuousQueryListener
/**
@@ -172,14 +174,23 @@ class ContinuousQueryManager(sqlContext: SQLContext) {
checkpointLocation: String,
df: DataFrame,
sink: Sink,
- trigger: Trigger = ProcessingTime(0)): ContinuousQuery = {
+ trigger: Trigger = ProcessingTime(0),
+ outputMode: OutputMode = Append): ContinuousQuery = {
activeQueriesLock.synchronized {
if (activeQueries.contains(name)) {
throw new IllegalArgumentException(
s"Cannot start query with name $name as a query with that name is already active")
}
+ val analyzedPlan = df.queryExecution.analyzed
+ df.queryExecution.assertAnalyzed()
+
+ if (sqlContext.conf.getConf(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) {
+ UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
+ }
+
var nextSourceId = 0L
- val logicalPlan = df.logicalPlan.transform {
+
+ val logicalPlan = analyzedPlan.transform {
case StreamingRelation(dataSource, _, output) =>
// Materialize source to avoid creating it in every batch
val metadataPath = s"$checkpointLocation/sources/$nextSourceId"
@@ -195,6 +206,7 @@ class ContinuousQueryManager(sqlContext: SQLContext) {
checkpointLocation,
logicalPlan,
sink,
+ outputMode,
trigger)
query.start()
activeQueries.put(name, query)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index fb3e184a64..1a09d70fb9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -461,9 +461,7 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
@Experimental
- def isStreaming: Boolean = logicalPlan.find { n =>
- n.isInstanceOf[StreamingRelation] || n.isInstanceOf[StreamingExecutionRelation]
- }.isDefined
+ def isStreaming: Boolean = logicalPlan.isStreaming
/**
* Displays the [[Dataset]] in a tabular form. Strings more than 20 characters will be truncated,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index f5e1e77263..ddcae0fe07 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -20,9 +20,11 @@ package org.apache.spark.sql.execution
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
+import org.apache.spark.sql.internal.SQLConf
/**
* The primary workflow for executing relational queries using Spark. Designed to allow easy
@@ -43,10 +45,17 @@ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {
throw ae
}
+ def assertSupported(): Unit = {
+ if (sqlContext.conf.getConf(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) {
+ UnsupportedOperationChecker.checkForBatch(analyzed)
+ }
+ }
+
lazy val analyzed: LogicalPlan = sqlContext.sessionState.analyzer.execute(logical)
lazy val withCachedData: LogicalPlan = {
assertAnalyzed()
+ assertSupported()
sqlContext.cacheManager.useCachedData(analyzed)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index aaced49dd1..81244ed874 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -18,9 +18,11 @@
package org.apache.spark.sql.execution.streaming
import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.analysis.{OutputMode, UnsupportedOperationChecker}
+import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner, UnaryNode}
+import org.apache.spark.sql.internal.SQLConf
/**
* A variant of [[QueryExecution]] that allows the execution of the given [[LogicalPlan]]
@@ -29,6 +31,7 @@ import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner,
class IncrementalExecution(
ctx: SQLContext,
logicalPlan: LogicalPlan,
+ outputMode: OutputMode,
checkpointLocation: String,
currentBatchId: Long) extends QueryExecution(ctx, logicalPlan) {
@@ -69,4 +72,7 @@ class IncrementalExecution(
}
override def preparations: Seq[Rule[SparkPlan]] = state +: super.preparations
+
+ /** No need assert supported, as this check has already been done */
+ override def assertSupported(): Unit = { }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 87dd27a2b1..2a1fa1ba62 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.OutputMode
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
@@ -48,6 +49,7 @@ class StreamExecution(
checkpointRoot: String,
private[sql] val logicalPlan: LogicalPlan,
val sink: Sink,
+ val outputMode: OutputMode,
val trigger: Trigger) extends ContinuousQuery with Logging {
/** An monitor used to wait/notify when batches complete. */
@@ -314,8 +316,13 @@ class StreamExecution(
}
val optimizerStart = System.nanoTime()
- lastExecution =
- new IncrementalExecution(sqlContext, newPlan, checkpointFile("state"), currentBatchId)
+ lastExecution = new IncrementalExecution(
+ sqlContext,
+ newPlan,
+ outputMode,
+ checkpointFile("state"),
+ currentBatchId)
+
lastExecution.executedPlan
val optimizerTime = (System.nanoTime() - optimizerStart).toDouble / 1000000
logDebug(s"Optimized batch in ${optimizerTime}ms")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index d2872e49ce..c29291eb58 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -37,6 +37,7 @@ object StreamingRelation {
*/
case class StreamingRelation(dataSource: DataSource, sourceName: String, output: Seq[Attribute])
extends LeafNode {
+ override def isStreaming: Boolean = true
override def toString: String = sourceName
}
@@ -45,6 +46,7 @@ case class StreamingRelation(dataSource: DataSource, sourceName: String, output:
* [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]].
*/
case class StreamingExecutionRelation(source: Source, output: Seq[Attribute]) extends LeafNode {
+ override def isStreaming: Boolean = true
override def toString: String = source.toString
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 70e18cebdd..7f206bdb9b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -442,6 +442,14 @@ object SQLConf {
.stringConf
.createOptional
+ val UNSUPPORTED_OPERATION_CHECK_ENABLED =
+ SQLConfigBuilder("spark.sql.streaming.unsupportedOperationCheck")
+ .internal()
+ .doc("When true, the logical plan for continuous query will be checked for unsupported" +
+ " operations.")
+ .booleanConf
+ .createWithDefault(true)
+
// TODO: This is still WIP and shouldn't be turned on without extensive test coverage
val COLUMNAR_AGGREGATE_MAP_ENABLED = SQLConfigBuilder("spark.sql.codegen.aggregate.map.enabled")
.internal()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
index 6ccc99fe17..242ea9cb27 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
@@ -33,6 +33,7 @@ import org.scalatest.exceptions.TestFailedDueToTimeoutException
import org.scalatest.time.Span
import org.scalatest.time.SpanSugar._
+import org.apache.spark.sql.catalyst.analysis.{Append, OutputMode}
import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._
@@ -75,6 +76,8 @@ trait StreamTest extends QueryTest with Timeouts {
/** How long to wait for an active stream to catch up when checking a result. */
val streamingTimeout = 10.seconds
+ val outputMode: OutputMode = Append
+
/** A trait for actions that can be performed while testing a streaming DataFrame. */
trait StreamAction
@@ -228,6 +231,7 @@ trait StreamTest extends QueryTest with Timeouts {
|$testActions
|
|== Stream ==
+ |Output Mode: $outputMode
|Stream state: $currentOffsets
|Thread state: $threadState
|${if (streamDeathCause != null) stackTraceToString(streamDeathCause) else ""}
@@ -235,6 +239,7 @@ trait StreamTest extends QueryTest with Timeouts {
|== Sink ==
|${sink.toDebugString}
|
+ |
|== Plan ==
|${if (currentStream != null) currentStream.lastExecution else ""}
""".stripMargin
@@ -293,7 +298,8 @@ trait StreamTest extends QueryTest with Timeouts {
StreamExecution.nextName,
metadataRoot,
stream,
- sink)
+ sink,
+ outputMode = outputMode)
.asInstanceOf[StreamExecution]
currentStream.microBatchThread.setUncaughtExceptionHandler(
new UncaughtExceptionHandler {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 2bd27c7efd..6f3149dbc5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -17,10 +17,9 @@
package org.apache.spark.sql.streaming
-import org.scalatest.concurrent.Eventually._
-
-import org.apache.spark.sql.{DataFrame, Row, SQLContext, StreamTest}
+import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.functions._
import org.apache.spark.sql.sources.StreamSourceProvider
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
@@ -108,6 +107,35 @@ class StreamSuite extends StreamTest with SharedSQLContext {
assertDF(df)
assertDF(df)
}
+
+ test("unsupported queries") {
+ val streamInput = MemoryStream[Int]
+ val batchInput = Seq(1, 2, 3).toDS()
+
+ def assertError(expectedMsgs: Seq[String])(body: => Unit): Unit = {
+ val e = intercept[AnalysisException] {
+ body
+ }
+ expectedMsgs.foreach { s => assert(e.getMessage.contains(s)) }
+ }
+
+ // Running streaming plan as a batch query
+ assertError("startStream" :: Nil) {
+ streamInput.toDS.map { i => i }.count()
+ }
+
+ // Running non-streaming plan with as a streaming query
+ assertError("without streaming sources" :: "startStream" :: Nil) {
+ val ds = batchInput.map { i => i }
+ testStream(ds)()
+ }
+
+ // Running streaming plan that cannot be incrementalized
+ assertError("not supported" :: "streaming" :: Nil) {
+ val ds = streamInput.toDS.map { i => i }.sort()
+ testStream(ds)()
+ }
+ }
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index 3af7c01e52..fa3b122f6d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.streaming
import org.apache.spark.SparkException
import org.apache.spark.sql.StreamTest
+import org.apache.spark.sql.catalyst.analysis.Update
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.expressions.scala.typed
import org.apache.spark.sql.functions._
@@ -32,6 +33,8 @@ class StreamingAggregationSuite extends StreamTest with SharedSQLContext {
import testImplicits._
+ override val outputMode = Update
+
test("simple count") {
val inputData = MemoryStream[Int]