aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-06-10 10:48:28 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-06-10 10:48:28 -0700
commitabdb5d42c5802c8f60876aa1285c803d02881258 (patch)
tree2383dfbac02c4808a08b960879312756bd0f8e8b
parentcdd7f5a57a21d4a8f93456d149f65859c96190cf (diff)
downloadspark-abdb5d42c5802c8f60876aa1285c803d02881258.tar.gz
spark-abdb5d42c5802c8f60876aa1285c803d02881258.tar.bz2
spark-abdb5d42c5802c8f60876aa1285c803d02881258.zip
[SPARK-15812][SQ][STREAMING] Added support for sorting after streaming aggregation with complete mode
## What changes were proposed in this pull request? When the output mode is complete, then the output of a streaming aggregation essentially will contain the complete aggregates every time. So this is not different from a batch dataset within an incremental execution. Other non-streaming operations should be supported on this dataset. In this PR, I am just adding support for sorting, as it is a common useful functionality. Support for other operations will come later. ## How was this patch tested? Additional unit tests. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #13549 from tdas/SPARK-15812.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala61
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala17
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala24
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala25
4 files changed, 95 insertions, 32 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index 8373fa336d..689e016a5a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -43,6 +43,41 @@ object UnsupportedOperationChecker {
"Queries without streaming sources cannot be executed with write.startStream()")(plan)
}
+ // Disallow multiple streaming aggregations
+ val aggregates = plan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a }
+
+ if (aggregates.size > 1) {
+ throwError(
+ "Multiple streaming aggregations are not supported with " +
+ "streaming DataFrames/Datasets")(plan)
+ }
+
+ // Disallow some output mode
+ outputMode match {
+ case InternalOutputModes.Append if aggregates.nonEmpty =>
+ throwError(
+ s"$outputMode output mode not supported when there are streaming aggregations on " +
+ s"streaming DataFrames/DataSets")(plan)
+
+ case InternalOutputModes.Complete | InternalOutputModes.Update if aggregates.isEmpty =>
+ throwError(
+ s"$outputMode output mode not supported when there are no streaming aggregations on " +
+ s"streaming DataFrames/Datasets")(plan)
+
+ case _ =>
+ }
+
+ /**
+ * Whether the subplan will contain complete data or incremental data in every incremental
+ * execution. Some operations may be allowed only when the child logical plan gives complete
+ * data.
+ */
+ def containsCompleteData(subplan: LogicalPlan): Boolean = {
+ val aggs = plan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a }
+ // Either the subplan has no streaming source, or it has aggregation with Complete mode
+ !subplan.isStreaming || (aggs.nonEmpty && outputMode == InternalOutputModes.Complete)
+ }
+
plan.foreachUp { implicit subPlan =>
// Operations that cannot exists anywhere in a streaming plan
@@ -107,8 +142,9 @@ object UnsupportedOperationChecker {
case GlobalLimit(_, _) | LocalLimit(_, _) if subPlan.children.forall(_.isStreaming) =>
throwError("Limits are not supported on streaming DataFrames/Datasets")
- case Sort(_, _, _) | SortPartitions(_, _) if subPlan.children.forall(_.isStreaming) =>
- throwError("Sorting is not supported on streaming DataFrames/Datasets")
+ case Sort(_, _, _) | SortPartitions(_, _) if !containsCompleteData(subPlan) =>
+ throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it is on" +
+ "aggregated DataFrame/Dataset in Complete mode")
case Sample(_, _, _, _, child) if child.isStreaming =>
throwError("Sampling is not supported on streaming DataFrames/Datasets")
@@ -123,27 +159,6 @@ object UnsupportedOperationChecker {
case _ =>
}
}
-
- // Checks related to aggregations
- val aggregates = plan.collect { case a @ Aggregate(_, _, _) if a.isStreaming => a }
- outputMode match {
- case InternalOutputModes.Append if aggregates.nonEmpty =>
- throwError(
- s"$outputMode output mode not supported when there are streaming aggregations on " +
- s"streaming DataFrames/DataSets")(plan)
-
- case InternalOutputModes.Complete | InternalOutputModes.Update if aggregates.isEmpty =>
- throwError(
- s"$outputMode output mode not supported when there are no streaming aggregations on " +
- s"streaming DataFrames/Datasets")(plan)
-
- case _ =>
- }
- if (aggregates.size > 1) {
- throwError(
- "Multiple streaming aggregations are not supported with " +
- "streaming DataFrames/Datasets")(plan)
- }
}
private def throwErrorIf(
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index 378cca3644..c21ad5e03a 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -81,7 +81,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
outputMode = Append,
expectedMsgs = "commands" :: Nil)
- // Multiple streaming aggregations not supported
+ // Aggregation: Multiple streaming aggregations not supported
def aggExprs(name: String): Seq[NamedExpression] = Seq(Count("*").as(name))
assertSupportedInStreamingPlan(
@@ -189,8 +189,20 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
_.intersect(_),
streamStreamSupported = false)
- // Unary operations
+ // Sort: supported only on batch subplans and on aggregation + complete output mode
testUnaryOperatorInStreamingPlan("sort", Sort(Nil, true, _))
+ assertSupportedInStreamingPlan(
+ "sort - sort over aggregated data in Complete output mode",
+ streamRelation.groupBy()(Count("*")).sortBy(),
+ Complete)
+ assertNotSupportedInStreamingPlan(
+ "sort - sort over aggregated data in Update output mode",
+ streamRelation.groupBy()(Count("*")).sortBy(),
+ Update,
+ Seq("sort", "aggregat", "complete")) // sort on aggregations is supported on Complete mode only
+
+
+ // Other unary operations
testUnaryOperatorInStreamingPlan("sort partitions", SortPartitions(Nil, _), expectedMsg = "sort")
testUnaryOperatorInStreamingPlan(
"sample", Sample(0.1, 1, true, 1L, _)(), expectedMsg = "sampling")
@@ -299,6 +311,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
outputMode)
}
+ /** Test output mode with and without aggregation in the streaming plan */
def testOutputMode(
outputMode: OutputMode,
shouldSupportAggregation: Boolean): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 194c3e7307..7f1e5fe613 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -111,10 +111,13 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
def apply[A : Encoder](data: A*): CheckAnswerRows = {
val encoder = encoderFor[A]
val toExternalRow = RowEncoder(encoder.schema).resolveAndBind()
- CheckAnswerRows(data.map(d => toExternalRow.fromRow(encoder.toRow(d))), false)
+ CheckAnswerRows(
+ data.map(d => toExternalRow.fromRow(encoder.toRow(d))),
+ lastOnly = false,
+ isSorted = false)
}
- def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows, false)
+ def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows, false, false)
}
/**
@@ -123,15 +126,22 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
*/
object CheckLastBatch {
def apply[A : Encoder](data: A*): CheckAnswerRows = {
+ apply(isSorted = false, data: _*)
+ }
+
+ def apply[A: Encoder](isSorted: Boolean, data: A*): CheckAnswerRows = {
val encoder = encoderFor[A]
val toExternalRow = RowEncoder(encoder.schema).resolveAndBind()
- CheckAnswerRows(data.map(d => toExternalRow.fromRow(encoder.toRow(d))), true)
+ CheckAnswerRows(
+ data.map(d => toExternalRow.fromRow(encoder.toRow(d))),
+ lastOnly = true,
+ isSorted = isSorted)
}
- def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows, true)
+ def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows, true, false)
}
- case class CheckAnswerRows(expectedAnswer: Seq[Row], lastOnly: Boolean)
+ case class CheckAnswerRows(expectedAnswer: Seq[Row], lastOnly: Boolean, isSorted: Boolean)
extends StreamAction with StreamMustBeRunning {
override def toString: String = s"$operatorName: ${expectedAnswer.mkString(",")}"
private def operatorName = if (lastOnly) "CheckLastBatch" else "CheckAnswer"
@@ -414,7 +424,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
failTest("Error adding data", e)
}
- case CheckAnswerRows(expectedAnswer, lastOnly) =>
+ case CheckAnswerRows(expectedAnswer, lastOnly, isSorted) =>
verify(currentStream != null, "stream not running")
// Get the map of source index to the current source objects
val indexToSource = currentStream
@@ -436,7 +446,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
failTest("Exception while getting data from sink", e)
}
- QueryTest.sameRows(expectedAnswer, sparkAnswer).foreach {
+ QueryTest.sameRows(expectedAnswer, sparkAnswer, isSorted).foreach {
error => failTest(error)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index 1f174aee8c..8681199817 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -104,6 +104,31 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll {
}
}
+ test("sort after aggregate in complete mode") {
+ val inputData = MemoryStream[Int]
+
+ val aggregated =
+ inputData.toDF()
+ .groupBy($"value")
+ .agg(count("*"))
+ .toDF("value", "count")
+ .orderBy($"count".desc)
+ .as[(Int, Long)]
+
+ testStream(aggregated, Complete)(
+ AddData(inputData, 3),
+ CheckLastBatch(isSorted = true, (3, 1)),
+ AddData(inputData, 2, 3),
+ CheckLastBatch(isSorted = true, (3, 2), (2, 1)),
+ StopStream,
+ StartStream(),
+ AddData(inputData, 3, 2, 1),
+ CheckLastBatch(isSorted = true, (3, 3), (2, 2), (1, 1)),
+ AddData(inputData, 4, 4, 4, 4),
+ CheckLastBatch(isSorted = true, (4, 4), (3, 3), (2, 2), (1, 1))
+ )
+ }
+
test("multiple keys") {
val inputData = MemoryStream[Int]