aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-12 19:02:27 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-12 19:02:27 -0800
commit034f89aaab1db95e8908432f2445d6841526efcf (patch)
tree894aebd0f08d22f6b78c4d1849e522c50b8b6730 /streaming/src/test
parent74d0126257838f29e3fad519b9f1a5acde88bef6 (diff)
downloadspark-034f89aaab1db95e8908432f2445d6841526efcf.tar.gz
spark-034f89aaab1db95e8908432f2445d6841526efcf.tar.bz2
spark-034f89aaab1db95e8908432f2445d6841526efcf.zip
Fixed persistence logic of WindowedDStream, and fixed default persistence level of input streams.
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala14
1 files changed, 14 insertions, 0 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
index 8f3c2dd86c..471c99fab4 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.streaming
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.storage.StorageLevel
class WindowOperationsSuite extends TestSuiteBase {
@@ -144,6 +145,19 @@ class WindowOperationsSuite extends TestSuiteBase {
Seconds(3)
)
+ test("window - persistence level") {
+ val input = Seq( Seq(0), Seq(1), Seq(2), Seq(3), Seq(4), Seq(5))
+ val ssc = new StreamingContext(conf, batchDuration)
+ val inputStream = new TestInputStream[Int](ssc, input, 1)
+ val windowStream1 = inputStream.window(batchDuration * 2)
+ assert(windowStream1.storageLevel === StorageLevel.NONE)
+ assert(inputStream.storageLevel === StorageLevel.MEMORY_ONLY_SER)
+ windowStream1.persist(StorageLevel.MEMORY_ONLY)
+ assert(windowStream1.storageLevel === StorageLevel.NONE)
+ assert(inputStream.storageLevel === StorageLevel.MEMORY_ONLY)
+ ssc.stop()
+ }
+
// Testing naive reduceByKeyAndWindow (without invertible function)
testReduceByKeyAndWindow(