aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala183
1 files changed, 175 insertions, 8 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
index 09c35bbf2c..e5bd0b4744 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
@@ -17,27 +17,132 @@
package org.apache.spark.sql.streaming
-import org.apache.spark.sql.{AnalysisException, Row, StreamTest}
+import scala.language.implicitConversions
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.util.Utils
-class MemorySinkSuite extends StreamTest with SharedSQLContext {
+class MemorySinkSuite extends StreamTest with SharedSQLContext with BeforeAndAfter {
+
import testImplicits._
- test("registering as a table") {
- testRegisterAsTable()
+ after {
+ sqlContext.streams.active.foreach(_.stop())
}
- ignore("stress test") {
- // Ignore the stress test as it takes several minutes to run
- (0 until 1000).foreach(_ => testRegisterAsTable())
+ test("directly add data in Append output mode") {
+ implicit val schema = new StructType().add(new StructField("value", IntegerType))
+ val sink = new MemorySink(schema, InternalOutputModes.Append)
+
+ // Before adding data, check output
+ assert(sink.latestBatchId === None)
+ checkAnswer(sink.latestBatchData, Seq.empty)
+ checkAnswer(sink.allData, Seq.empty)
+
+ // Add batch 0 and check outputs
+ sink.addBatch(0, 1 to 3)
+ assert(sink.latestBatchId === Some(0))
+ checkAnswer(sink.latestBatchData, 1 to 3)
+ checkAnswer(sink.allData, 1 to 3)
+
+ // Add batch 1 and check outputs
+ sink.addBatch(1, 4 to 6)
+ assert(sink.latestBatchId === Some(1))
+ checkAnswer(sink.latestBatchData, 4 to 6)
+ checkAnswer(sink.allData, 1 to 6) // new data should get appended to old data
+
+ // Re-add batch 1 with different data, should not be added and outputs should not be changed
+ sink.addBatch(1, 7 to 9)
+ assert(sink.latestBatchId === Some(1))
+ checkAnswer(sink.latestBatchData, 4 to 6)
+ checkAnswer(sink.allData, 1 to 6)
+
+ // Add batch 2 and check outputs
+ sink.addBatch(2, 7 to 9)
+ assert(sink.latestBatchId === Some(2))
+ checkAnswer(sink.latestBatchData, 7 to 9)
+ checkAnswer(sink.allData, 1 to 9)
}
- private def testRegisterAsTable(): Unit = {
+ test("directly add data in Update output mode") {
+ implicit val schema = new StructType().add(new StructField("value", IntegerType))
+ val sink = new MemorySink(schema, InternalOutputModes.Update)
+
+ // Before adding data, check output
+ assert(sink.latestBatchId === None)
+ checkAnswer(sink.latestBatchData, Seq.empty)
+ checkAnswer(sink.allData, Seq.empty)
+
+ // Add batch 0 and check outputs
+ sink.addBatch(0, 1 to 3)
+ assert(sink.latestBatchId === Some(0))
+ checkAnswer(sink.latestBatchData, 1 to 3)
+ checkAnswer(sink.allData, 1 to 3)
+
+ // Add batch 1 and check outputs
+ sink.addBatch(1, 4 to 6)
+ assert(sink.latestBatchId === Some(1))
+ checkAnswer(sink.latestBatchData, 4 to 6)
+ checkAnswer(sink.allData, 1 to 6) // new data should get appended to old data
+
+ // Re-add batch 1 with different data, should not be added and outputs should not be changed
+ sink.addBatch(1, 7 to 9)
+ assert(sink.latestBatchId === Some(1))
+ checkAnswer(sink.latestBatchData, 4 to 6)
+ checkAnswer(sink.allData, 1 to 6)
+
+ // Add batch 2 and check outputs
+ sink.addBatch(2, 7 to 9)
+ assert(sink.latestBatchId === Some(2))
+ checkAnswer(sink.latestBatchData, 7 to 9)
+ checkAnswer(sink.allData, 1 to 9)
+ }
+
+ test("directly add data in Complete output mode") {
+ implicit val schema = new StructType().add(new StructField("value", IntegerType))
+ val sink = new MemorySink(schema, InternalOutputModes.Complete)
+
+ // Before adding data, check output
+ assert(sink.latestBatchId === None)
+ checkAnswer(sink.latestBatchData, Seq.empty)
+ checkAnswer(sink.allData, Seq.empty)
+
+ // Add batch 0 and check outputs
+ sink.addBatch(0, 1 to 3)
+ assert(sink.latestBatchId === Some(0))
+ checkAnswer(sink.latestBatchData, 1 to 3)
+ checkAnswer(sink.allData, 1 to 3)
+
+ // Add batch 1 and check outputs
+ sink.addBatch(1, 4 to 6)
+ assert(sink.latestBatchId === Some(1))
+ checkAnswer(sink.latestBatchData, 4 to 6)
+ checkAnswer(sink.allData, 4 to 6) // new data should replace old data
+
+ // Re-add batch 1 with different data, should not be added and outputs should not be changed
+ sink.addBatch(1, 7 to 9)
+ assert(sink.latestBatchId === Some(1))
+ checkAnswer(sink.latestBatchData, 4 to 6)
+ checkAnswer(sink.allData, 4 to 6)
+
+ // Add batch 2 and check outputs
+ sink.addBatch(2, 7 to 9)
+ assert(sink.latestBatchId === Some(2))
+ checkAnswer(sink.latestBatchData, 7 to 9)
+ checkAnswer(sink.allData, 7 to 9)
+ }
+
+
+ test("registering as a table in Append output mode") {
val input = MemoryStream[Int]
val query = input.toDF().write
.format("memory")
+ .outputMode("append")
.queryName("memStream")
.startStream()
input.addData(1, 2, 3)
@@ -56,6 +161,57 @@ class MemorySinkSuite extends StreamTest with SharedSQLContext {
query.stop()
}
+ test("registering as a table in Complete output mode") {
+ val input = MemoryStream[Int]
+ val query = input.toDF()
+ .groupBy("value")
+ .count()
+ .write
+ .format("memory")
+ .outputMode("complete")
+ .queryName("memStream")
+ .startStream()
+ input.addData(1, 2, 3)
+ query.processAllAvailable()
+
+ checkDataset(
+ spark.table("memStream").as[(Int, Long)],
+ (1, 1L), (2, 1L), (3, 1L))
+
+ input.addData(4, 5, 6)
+ query.processAllAvailable()
+ checkDataset(
+ spark.table("memStream").as[(Int, Long)],
+ (1, 1L), (2, 1L), (3, 1L), (4, 1L), (5, 1L), (6, 1L))
+
+ query.stop()
+ }
+
+ ignore("stress test") {
+ // Ignore the stress test as it takes several minutes to run
+ (0 until 1000).foreach { _ =>
+ val input = MemoryStream[Int]
+ val query = input.toDF().write
+ .format("memory")
+ .queryName("memStream")
+ .startStream()
+ input.addData(1, 2, 3)
+ query.processAllAvailable()
+
+ checkDataset(
+ spark.table("memStream").as[Int],
+ 1, 2, 3)
+
+ input.addData(4, 5, 6)
+ query.processAllAvailable()
+ checkDataset(
+ spark.table("memStream").as[Int],
+ 1, 2, 3, 4, 5, 6)
+
+ query.stop()
+ }
+ }
+
test("error when no name is specified") {
val error = intercept[AnalysisException] {
val input = MemoryStream[Int]
@@ -88,4 +244,15 @@ class MemorySinkSuite extends StreamTest with SharedSQLContext {
.startStream()
}
}
+
+ private def checkAnswer(rows: Seq[Row], expected: Seq[Int])(implicit schema: StructType): Unit = {
+ checkAnswer(
+ sqlContext.createDataFrame(sparkContext.makeRDD(rows), schema),
+ intsToDF(expected)(schema))
+ }
+
+ private implicit def intsToDF(seq: Seq[Int])(implicit schema: StructType): DataFrame = {
+ require(schema.fields.size === 1)
+ sqlContext.createDataset(seq).toDF(schema.fieldNames.head)
+ }
}