aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala1
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala3
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala1
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala15
6 files changed, 21 insertions, 3 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 9406e0e20a..7037aae234 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -24,6 +24,7 @@ import org.apache.spark.SparkContext._
import util.ManualClock
import org.apache.spark.{SparkContext, SparkConf}
+import org.apache.spark.streaming.dstream.DStream
class BasicOperationsSuite extends TestSuiteBase {
test("map") {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 67ce5bc566..0c68c44ddb 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -26,7 +26,7 @@ import com.google.common.io.Files
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.streaming.dstream.FileInputDStream
+import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.util.Utils
import org.apache.spark.SparkConf
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index a477d200c9..f7f3346f81 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -23,6 +23,7 @@ import org.scalatest.concurrent.Timeouts
import org.scalatest.time.SpanSugar._
import org.apache.spark.{SparkException, SparkConf, SparkContext}
import org.apache.spark.util.{Utils, MetadataCleaner}
+import org.apache.spark.streaming.dstream.DStream
class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
@@ -186,7 +187,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
ssc = new StreamingContext(master, appName, batchDuration)
val inputStream = addInputStream(ssc)
inputStream.map(x => { throw new TestException("error in map task"); x})
- .foreach(_.count)
+ .foreachRDD(_.count)
val exception = intercept[Exception] {
ssc.start()
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index fa64142096..9e0f2c900e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming
import org.apache.spark.streaming.scheduler._
import scala.collection.mutable.ArrayBuffer
import org.scalatest.matchers.ShouldMatchers
+import org.apache.spark.streaming.dstream.DStream
class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 9b2bb57e77..535e5bd1f1 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming
-import org.apache.spark.streaming.dstream.{InputDStream, ForEachDStream}
+import org.apache.spark.streaming.dstream.{DStream, InputDStream, ForEachDStream}
import org.apache.spark.streaming.util.ManualClock
import scala.collection.mutable.ArrayBuffer
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
index c39abfc21b..471c99fab4 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
@@ -18,6 +18,8 @@
package org.apache.spark.streaming
import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.storage.StorageLevel
class WindowOperationsSuite extends TestSuiteBase {
@@ -143,6 +145,19 @@ class WindowOperationsSuite extends TestSuiteBase {
Seconds(3)
)
+ test("window - persistence level") {
+ val input = Seq( Seq(0), Seq(1), Seq(2), Seq(3), Seq(4), Seq(5))
+ val ssc = new StreamingContext(conf, batchDuration)
+ val inputStream = new TestInputStream[Int](ssc, input, 1)
+ val windowStream1 = inputStream.window(batchDuration * 2)
+ assert(windowStream1.storageLevel === StorageLevel.NONE)
+ assert(inputStream.storageLevel === StorageLevel.MEMORY_ONLY_SER)
+ windowStream1.persist(StorageLevel.MEMORY_ONLY)
+ assert(windowStream1.storageLevel === StorageLevel.NONE)
+ assert(inputStream.storageLevel === StorageLevel.MEMORY_ONLY)
+ ssc.stop()
+ }
+
// Testing naive reduceByKeyAndWindow (without invertible function)
testReduceByKeyAndWindow(