aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-05-22 02:08:18 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-05-22 02:08:18 -0700
commit1ffa608ba5a849739a56047bda8b157b86b08650 (patch)
treec73fb186f96875267bf8ac60e95029bfad9a184b
parent845e447fa03bf0a53ed79fa7e240af94dc152d2c (diff)
downloadspark-1ffa608ba5a849739a56047bda8b157b86b08650.tar.gz
spark-1ffa608ba5a849739a56047bda8b157b86b08650.tar.bz2
spark-1ffa608ba5a849739a56047bda8b157b86b08650.zip
[SPARK-15428][SQL] Disable multiple streaming aggregations
## What changes were proposed in this pull request? Incrementalizing plans of with multiple streaming aggregation is tricky and we dont have the necessary support for "delta" to implement correctly. So disabling the support for multiple streaming aggregations. ## How was this patch tested? Additional unit tests Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #13210 from tdas/SPARK-15428.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala18
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala39
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala19
3 files changed, 41 insertions, 35 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index aadc1d31bd..0e08bf013c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -55,10 +55,20 @@ object UnsupportedOperationChecker {
case _: InsertIntoTable =>
throwError("InsertIntoTable is not supported with streaming DataFrames/Datasets")
- case Aggregate(_, _, child) if child.isStreaming && outputMode == Append =>
- throwError(
- "Aggregations are not supported on streaming DataFrames/Datasets in " +
- "Append output mode. Consider changing output mode to Update.")
+ case Aggregate(_, _, child) if child.isStreaming =>
+ if (outputMode == Append) {
+ throwError(
+ "Aggregations are not supported on streaming DataFrames/Datasets in " +
+ "Append output mode. Consider changing output mode to Update.")
+ }
+ val moreStreamingAggregates = child.find {
+ case Aggregate(_, _, grandchild) if grandchild.isStreaming => true
+ case _ => false
+ }
+ if (moreStreamingAggregates.nonEmpty) {
+ throwError("Multiple streaming aggregations are not supported with " +
+ "streaming DataFrames/Datasets")
+ }
case Join(left, right, joinType, _) =>
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index 50baebe8bf..674277bdbe 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -23,7 +23,8 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.aggregate.Count
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types.IntegerType
@@ -95,6 +96,26 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
outputMode = Append,
Seq("aggregation", "append output mode"))
+ // Multiple streaming aggregations not supported
+ def aggExprs(name: String): Seq[NamedExpression] = Seq(Count("*").as(name))
+
+ assertSupportedInStreamingPlan(
+ "aggregate - multiple batch aggregations",
+ Aggregate(Nil, aggExprs("c"), Aggregate(Nil, aggExprs("d"), batchRelation)),
+ Update)
+
+ assertSupportedInStreamingPlan(
+ "aggregate - multiple aggregations but only one streaming aggregation",
+ Aggregate(Nil, aggExprs("c"), batchRelation).join(
+ Aggregate(Nil, aggExprs("d"), streamRelation), joinType = Inner),
+ Update)
+
+ assertNotSupportedInStreamingPlan(
+ "aggregate - multiple streaming aggregations",
+ Aggregate(Nil, aggExprs("c"), Aggregate(Nil, aggExprs("d"), streamRelation)),
+ outputMode = Update,
+ expectedMsgs = Seq("multiple streaming aggregations"))
+
// Inner joins: Stream-stream not supported
testBinaryOperationInStreamingPlan(
"inner join",
@@ -354,17 +375,11 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
val e = intercept[AnalysisException] {
testBody
}
-
- if (!expectedMsgs.map(_.toLowerCase).forall(e.getMessage.toLowerCase.contains)) {
- fail(
- s"""Exception message should contain the following substrings:
- |
- | ${expectedMsgs.mkString("\n ")}
- |
- |Actual exception message:
- |
- | ${e.getMessage}
- """.stripMargin)
+ expectedMsgs.foreach { m =>
+ if (!e.getMessage.toLowerCase.contains(m.toLowerCase)) {
+ fail(s"Exception message should contain: '$m', " +
+ s"actual exception message:\n\t'${e.getMessage}'")
+ }
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index 0f5fc9ca72..7104d01c4a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -84,25 +84,6 @@ class StreamingAggregationSuite extends StreamTest with SharedSQLContext with Be
)
}
- test("multiple aggregations") {
- val inputData = MemoryStream[Int]
-
- val aggregated =
- inputData.toDF()
- .groupBy($"value")
- .agg(count("*") as 'count)
- .groupBy($"value" % 2)
- .agg(sum($"count"))
- .as[(Int, Long)]
-
- testStream(aggregated)(
- AddData(inputData, 1, 2, 3, 4),
- CheckLastBatch((0, 2), (1, 2)),
- AddData(inputData, 1, 3, 5),
- CheckLastBatch((1, 5))
- )
- }
-
testQuietly("midbatch failure") {
val inputData = MemoryStream[Int]
FailureSinglton.firstTime = true