diff options
Diffstat (limited to 'streaming/src/test')
6 files changed, 21 insertions, 3 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 9406e0e20a..7037aae234 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.SparkContext._ import util.ManualClock import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.streaming.dstream.DStream class BasicOperationsSuite extends TestSuiteBase { test("map") { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 67ce5bc566..0c68c44ddb 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -26,7 +26,7 @@ import com.google.common.io.Files import org.apache.hadoop.fs.{Path, FileSystem} import org.apache.hadoop.conf.Configuration import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.streaming.dstream.FileInputDStream +import org.apache.spark.streaming.dstream.{DStream, FileInputDStream} import org.apache.spark.streaming.util.ManualClock import org.apache.spark.util.Utils import org.apache.spark.SparkConf diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index a477d200c9..f7f3346f81 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -23,6 +23,7 @@ import org.scalatest.concurrent.Timeouts import org.scalatest.time.SpanSugar._ import org.apache.spark.{SparkException, SparkConf, SparkContext} import org.apache.spark.util.{Utils, MetadataCleaner} +import org.apache.spark.streaming.dstream.DStream class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { @@ -186,7 +187,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { ssc = new StreamingContext(master, appName, batchDuration) val inputStream = addInputStream(ssc) inputStream.map(x => { throw new TestException("error in map task"); x}) - .foreach(_.count) + .foreachRDD(_.count) val exception = intercept[Exception] { ssc.start() diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index fa64142096..9e0f2c900e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming import org.apache.spark.streaming.scheduler._ import scala.collection.mutable.ArrayBuffer import org.scalatest.matchers.ShouldMatchers +import org.apache.spark.streaming.dstream.DStream class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 9b2bb57e77..535e5bd1f1 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming -import org.apache.spark.streaming.dstream.{InputDStream, ForEachDStream} +import org.apache.spark.streaming.dstream.{DStream, InputDStream, ForEachDStream} import org.apache.spark.streaming.util.ManualClock import scala.collection.mutable.ArrayBuffer diff --git a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala index c39abfc21b..471c99fab4 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala @@ -18,6 +18,8 @@ package org.apache.spark.streaming import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.storage.StorageLevel class WindowOperationsSuite extends TestSuiteBase { @@ -143,6 +145,19 @@ class WindowOperationsSuite extends TestSuiteBase { Seconds(3) ) + test("window - persistence level") { + val input = Seq( Seq(0), Seq(1), Seq(2), Seq(3), Seq(4), Seq(5)) + val ssc = new StreamingContext(conf, batchDuration) + val inputStream = new TestInputStream[Int](ssc, input, 1) + val windowStream1 = inputStream.window(batchDuration * 2) + assert(windowStream1.storageLevel === StorageLevel.NONE) + assert(inputStream.storageLevel === StorageLevel.MEMORY_ONLY_SER) + windowStream1.persist(StorageLevel.MEMORY_ONLY) + assert(windowStream1.storageLevel === StorageLevel.NONE) + assert(inputStream.storageLevel === StorageLevel.MEMORY_ONLY) + ssc.stop() + } + // Testing naive reduceByKeyAndWindow (without invertible function) testReduceByKeyAndWindow( |