diff options
author | Liwei Lin <lwlin7@gmail.com> | 2016-07-07 10:40:42 -0700 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-07-07 10:40:42 -0700 |
commit | 0f7175def985a7f1e37198680f893e749612ab76 (patch) | |
tree | 42726d797869d938765d58eca92aa18758b5e09a /sql/core/src/test | |
parent | a04cab8f17fcac05f86d2c472558ab98923f91e3 (diff) | |
download | spark-0f7175def985a7f1e37198680f893e749612ab76.tar.gz spark-0f7175def985a7f1e37198680f893e749612ab76.tar.bz2 spark-0f7175def985a7f1e37198680f893e749612ab76.zip |
[SPARK-16350][SQL] Fix support for incremental planning in wirteStream.foreach()
## What changes were proposed in this pull request?
There are cases where `complete` output mode does not output updated aggregated value; for details please refer to [SPARK-16350](https://issues.apache.org/jira/browse/SPARK-16350).
The cause is that, as we do `data.as[T].foreachPartition { iter => ... }` in `ForeachSink.addBatch()`, `foreachPartition()` does not support incremental planning for now.
This patches makes `foreachPartition()` support incremental planning in `ForeachSink`, by making a special version of `Dataset` with its `rdd()` method supporting incremental planning.
## How was this patch tested?
Added a unit test which failed before the change
Author: Liwei Lin <lwlin7@gmail.com>
Closes #14030 from lw-lin/fix-foreach-complete.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala | 86 |
1 files changed, 77 insertions, 9 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala index 6ff597c16b..7928b8e877 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala @@ -24,7 +24,7 @@ import scala.collection.mutable import org.scalatest.BeforeAndAfter import org.apache.spark.sql.ForeachWriter -import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.streaming.{OutputMode, StreamTest} import org.apache.spark.sql.test.SharedSQLContext class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAfter { @@ -35,35 +35,103 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf sqlContext.streams.active.foreach(_.stop()) } - test("foreach") { + test("foreach() with `append` output mode") { withTempDir { checkpointDir => val input = MemoryStream[Int] val query = input.toDS().repartition(2).writeStream .option("checkpointLocation", checkpointDir.getCanonicalPath) + .outputMode(OutputMode.Append) .foreach(new TestForeachWriter()) .start() + + // -- batch 0 --------------------------------------- input.addData(1, 2, 3, 4) query.processAllAvailable() - val expectedEventsForPartition0 = Seq( + var expectedEventsForPartition0 = Seq( ForeachSinkSuite.Open(partition = 0, version = 0), ForeachSinkSuite.Process(value = 1), ForeachSinkSuite.Process(value = 3), ForeachSinkSuite.Close(None) ) - val expectedEventsForPartition1 = Seq( + var expectedEventsForPartition1 = Seq( ForeachSinkSuite.Open(partition = 1, version = 0), ForeachSinkSuite.Process(value = 2), ForeachSinkSuite.Process(value = 4), ForeachSinkSuite.Close(None) ) - val allEvents = ForeachSinkSuite.allEvents() + var allEvents = ForeachSinkSuite.allEvents() + assert(allEvents.size === 2) + assert(allEvents.toSet === Set(expectedEventsForPartition0, expectedEventsForPartition1)) + + ForeachSinkSuite.clear() + + // -- batch 1 --------------------------------------- + input.addData(5, 6, 7, 8) + query.processAllAvailable() + + expectedEventsForPartition0 = Seq( + ForeachSinkSuite.Open(partition = 0, version = 1), + ForeachSinkSuite.Process(value = 5), + ForeachSinkSuite.Process(value = 7), + ForeachSinkSuite.Close(None) + ) + expectedEventsForPartition1 = Seq( + ForeachSinkSuite.Open(partition = 1, version = 1), + ForeachSinkSuite.Process(value = 6), + ForeachSinkSuite.Process(value = 8), + ForeachSinkSuite.Close(None) + ) + + allEvents = ForeachSinkSuite.allEvents() assert(allEvents.size === 2) - assert { - allEvents === Seq(expectedEventsForPartition0, expectedEventsForPartition1) || - allEvents === Seq(expectedEventsForPartition1, expectedEventsForPartition0) - } + assert(allEvents.toSet === Set(expectedEventsForPartition0, expectedEventsForPartition1)) + + query.stop() + } + } + + test("foreach() with `complete` output mode") { + withTempDir { checkpointDir => + val input = MemoryStream[Int] + + val query = input.toDS() + .groupBy().count().as[Long].map(_.toInt) + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .outputMode(OutputMode.Complete) + .foreach(new TestForeachWriter()) + .start() + + // -- batch 0 --------------------------------------- + input.addData(1, 2, 3, 4) + query.processAllAvailable() + + var allEvents = ForeachSinkSuite.allEvents() + assert(allEvents.size === 1) + var expectedEvents = Seq( + ForeachSinkSuite.Open(partition = 0, version = 0), + ForeachSinkSuite.Process(value = 4), + ForeachSinkSuite.Close(None) + ) + assert(allEvents === Seq(expectedEvents)) + + ForeachSinkSuite.clear() + + // -- batch 1 --------------------------------------- + input.addData(5, 6, 7, 8) + query.processAllAvailable() + + allEvents = ForeachSinkSuite.allEvents() + assert(allEvents.size === 1) + expectedEvents = Seq( + ForeachSinkSuite.Open(partition = 0, version = 1), + ForeachSinkSuite.Process(value = 8), + ForeachSinkSuite.Close(None) + ) + assert(allEvents === Seq(expectedEvents)) + query.stop() } } |