From abdb5d42c5802c8f60876aa1285c803d02881258 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 10 Jun 2016 10:48:28 -0700 Subject: [SPARK-15812][SQ][STREAMING] Added support for sorting after streaming aggregation with complete mode ## What changes were proposed in this pull request? When the output mode is complete, then the output of a streaming aggregation essentially will contain the complete aggregates every time. So this is not different from a batch dataset within an incremental execution. Other non-streaming operations should be supported on this dataset. In this PR, I am just adding support for sorting, as it is a common useful functionality. Support for other operations will come later. ## How was this patch tested? Additional unit tests. Author: Tathagata Das Closes #13549 from tdas/SPARK-15812. --- .../analysis/UnsupportedOperationChecker.scala | 61 ++++++++++++++-------- .../analysis/UnsupportedOperationsSuite.scala | 17 +++++- .../apache/spark/sql/streaming/StreamTest.scala | 24 ++++++--- .../sql/streaming/StreamingAggregationSuite.scala | 25 +++++++++ 4 files changed, 95 insertions(+), 32 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index 8373fa336d..689e016a5a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -43,6 +43,41 @@ object UnsupportedOperationChecker { "Queries without streaming sources cannot be executed with write.startStream()")(plan) } + // Disallow multiple streaming aggregations + val aggregates = plan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a } + + if (aggregates.size > 1) { + throwError( + "Multiple streaming aggregations are not supported with " + + "streaming DataFrames/Datasets")(plan) + } + + // Disallow some output mode + outputMode match { + case InternalOutputModes.Append if aggregates.nonEmpty => + throwError( + s"$outputMode output mode not supported when there are streaming aggregations on " + + s"streaming DataFrames/DataSets")(plan) + + case InternalOutputModes.Complete | InternalOutputModes.Update if aggregates.isEmpty => + throwError( + s"$outputMode output mode not supported when there are no streaming aggregations on " + + s"streaming DataFrames/Datasets")(plan) + + case _ => + } + + /** + * Whether the subplan will contain complete data or incremental data in every incremental + * execution. Some operations may be allowed only when the child logical plan gives complete + * data. + */ + def containsCompleteData(subplan: LogicalPlan): Boolean = { + val aggs = plan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a } + // Either the subplan has no streaming source, or it has aggregation with Complete mode + !subplan.isStreaming || (aggs.nonEmpty && outputMode == InternalOutputModes.Complete) + } + plan.foreachUp { implicit subPlan => // Operations that cannot exists anywhere in a streaming plan @@ -107,8 +142,9 @@ object UnsupportedOperationChecker { case GlobalLimit(_, _) | LocalLimit(_, _) if subPlan.children.forall(_.isStreaming) => throwError("Limits are not supported on streaming DataFrames/Datasets") - case Sort(_, _, _) | SortPartitions(_, _) if subPlan.children.forall(_.isStreaming) => - throwError("Sorting is not supported on streaming DataFrames/Datasets") + case Sort(_, _, _) | SortPartitions(_, _) if !containsCompleteData(subPlan) => + throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it is on" + + "aggregated DataFrame/Dataset in Complete mode") case Sample(_, _, _, _, child) if child.isStreaming => throwError("Sampling is not supported on streaming DataFrames/Datasets") @@ -123,27 +159,6 @@ object UnsupportedOperationChecker { case _ => } } - - // Checks related to aggregations - val aggregates = plan.collect { case a @ Aggregate(_, _, _) if a.isStreaming => a } - outputMode match { - case InternalOutputModes.Append if aggregates.nonEmpty => - throwError( - s"$outputMode output mode not supported when there are streaming aggregations on " + - s"streaming DataFrames/DataSets")(plan) - - case InternalOutputModes.Complete | InternalOutputModes.Update if aggregates.isEmpty => - throwError( - s"$outputMode output mode not supported when there are no streaming aggregations on " + - s"streaming DataFrames/Datasets")(plan) - - case _ => - } - if (aggregates.size > 1) { - throwError( - "Multiple streaming aggregations are not supported with " + - "streaming DataFrames/Datasets")(plan) - } } private def throwErrorIf( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index 378cca3644..c21ad5e03a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -81,7 +81,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite { outputMode = Append, expectedMsgs = "commands" :: Nil) - // Multiple streaming aggregations not supported + // Aggregation: Multiple streaming aggregations not supported def aggExprs(name: String): Seq[NamedExpression] = Seq(Count("*").as(name)) assertSupportedInStreamingPlan( @@ -189,8 +189,20 @@ class UnsupportedOperationsSuite extends SparkFunSuite { _.intersect(_), streamStreamSupported = false) - // Unary operations + // Sort: supported only on batch subplans and on aggregation + complete output mode testUnaryOperatorInStreamingPlan("sort", Sort(Nil, true, _)) + assertSupportedInStreamingPlan( + "sort - sort over aggregated data in Complete output mode", + streamRelation.groupBy()(Count("*")).sortBy(), + Complete) + assertNotSupportedInStreamingPlan( + "sort - sort over aggregated data in Update output mode", + streamRelation.groupBy()(Count("*")).sortBy(), + Update, + Seq("sort", "aggregat", "complete")) // sort on aggregations is supported on Complete mode only + + + // Other unary operations testUnaryOperatorInStreamingPlan("sort partitions", SortPartitions(Nil, _), expectedMsg = "sort") testUnaryOperatorInStreamingPlan( "sample", Sample(0.1, 1, true, 1L, _)(), expectedMsg = "sampling") @@ -299,6 +311,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite { outputMode) } + /** Test output mode with and without aggregation in the streaming plan */ def testOutputMode( outputMode: OutputMode, shouldSupportAggregation: Boolean): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 194c3e7307..7f1e5fe613 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -111,10 +111,13 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { def apply[A : Encoder](data: A*): CheckAnswerRows = { val encoder = encoderFor[A] val toExternalRow = RowEncoder(encoder.schema).resolveAndBind() - CheckAnswerRows(data.map(d => toExternalRow.fromRow(encoder.toRow(d))), false) + CheckAnswerRows( + data.map(d => toExternalRow.fromRow(encoder.toRow(d))), + lastOnly = false, + isSorted = false) } - def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows, false) + def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows, false, false) } /** @@ -123,15 +126,22 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { */ object CheckLastBatch { def apply[A : Encoder](data: A*): CheckAnswerRows = { + apply(isSorted = false, data: _*) + } + + def apply[A: Encoder](isSorted: Boolean, data: A*): CheckAnswerRows = { val encoder = encoderFor[A] val toExternalRow = RowEncoder(encoder.schema).resolveAndBind() - CheckAnswerRows(data.map(d => toExternalRow.fromRow(encoder.toRow(d))), true) + CheckAnswerRows( + data.map(d => toExternalRow.fromRow(encoder.toRow(d))), + lastOnly = true, + isSorted = isSorted) } - def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows, true) + def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows, true, false) } - case class CheckAnswerRows(expectedAnswer: Seq[Row], lastOnly: Boolean) + case class CheckAnswerRows(expectedAnswer: Seq[Row], lastOnly: Boolean, isSorted: Boolean) extends StreamAction with StreamMustBeRunning { override def toString: String = s"$operatorName: ${expectedAnswer.mkString(",")}" private def operatorName = if (lastOnly) "CheckLastBatch" else "CheckAnswer" @@ -414,7 +424,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { failTest("Error adding data", e) } - case CheckAnswerRows(expectedAnswer, lastOnly) => + case CheckAnswerRows(expectedAnswer, lastOnly, isSorted) => verify(currentStream != null, "stream not running") // Get the map of source index to the current source objects val indexToSource = currentStream @@ -436,7 +446,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { failTest("Exception while getting data from sink", e) } - QueryTest.sameRows(expectedAnswer, sparkAnswer).foreach { + QueryTest.sameRows(expectedAnswer, sparkAnswer, isSorted).foreach { error => failTest(error) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 1f174aee8c..8681199817 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -104,6 +104,31 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll { } } + test("sort after aggregate in complete mode") { + val inputData = MemoryStream[Int] + + val aggregated = + inputData.toDF() + .groupBy($"value") + .agg(count("*")) + .toDF("value", "count") + .orderBy($"count".desc) + .as[(Int, Long)] + + testStream(aggregated, Complete)( + AddData(inputData, 3), + CheckLastBatch(isSorted = true, (3, 1)), + AddData(inputData, 2, 3), + CheckLastBatch(isSorted = true, (3, 2), (2, 1)), + StopStream, + StartStream(), + AddData(inputData, 3, 2, 1), + CheckLastBatch(isSorted = true, (3, 3), (2, 2), (1, 1)), + AddData(inputData, 4, 4, 4, 4), + CheckLastBatch(isSorted = true, (4, 4), (3, 3), (2, 2), (1, 1)) + ) + } + test("multiple keys") { val inputData = MemoryStream[Int] -- cgit v1.2.3