aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
authorLiwei Lin <lwlin7@gmail.com>2016-11-07 17:49:24 -0800
committerReynold Xin <rxin@databricks.com>2016-11-07 17:49:24 -0800
commitc1a0c66bd2662bc40f312da474c3b95229fe92d0 (patch)
tree34a081584fe90158d779824a9623c5e788d02687 /sql/core/src/test
parent9b0593d5e99bb919c4abb8d0836a126ec2eaf1d5 (diff)
downloadspark-c1a0c66bd2662bc40f312da474c3b95229fe92d0.tar.gz
spark-c1a0c66bd2662bc40f312da474c3b95229fe92d0.tar.bz2
spark-c1a0c66bd2662bc40f312da474c3b95229fe92d0.zip
[SPARK-18261][STRUCTURED STREAMING] Add statistics to MemorySink for joining
## What changes were proposed in this pull request? Right now, there is no way to join the output of a memory sink with any table: > UnsupportedOperationException: LeafNode MemoryPlan must implement statistics This patch adds statistics to MemorySink, making joining snapshots of memory streams with tables possible. ## How was this patch tested? Added a test case. Author: Liwei Lin <lwlin7@gmail.com> Closes #15786 from lw-lin/memory-sink-stat.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala16
1 files changed, 16 insertions, 0 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
index 310d756302..4e9fba9dba 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
@@ -187,6 +187,22 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter {
query.stop()
}
+ test("MemoryPlan statistics") {
+ implicit val schema = new StructType().add(new StructField("value", IntegerType))
+ val sink = new MemorySink(schema, InternalOutputModes.Append)
+ val plan = new MemoryPlan(sink)
+
+ // Before adding data, check output
+ checkAnswer(sink.allData, Seq.empty)
+ assert(plan.statistics.sizeInBytes === 0)
+
+ sink.addBatch(0, 1 to 3)
+ assert(plan.statistics.sizeInBytes === 12)
+
+ sink.addBatch(1, 4 to 6)
+ assert(plan.statistics.sizeInBytes === 24)
+ }
+
ignore("stress test") {
// Ignore the stress test as it takes several minutes to run
(0 until 1000).foreach { _ =>