diff options
author | Patrick Wendell <pwendell@gmail.com> | 2014-01-13 12:18:05 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-01-13 12:18:05 -0800 |
commit | b93f9d42f21f03163734ef97b2871db945e166da (patch) | |
tree | ec06b7bd5e6dd7c61061995c94f8d75661c58918 /streaming/src/test | |
parent | e6ed13f255d70de422711b979447690cdab7423b (diff) | |
parent | ffa1d38ef19a7d5c5c2fc173d1d2f54267449f80 (diff) | |
download | spark-b93f9d42f21f03163734ef97b2871db945e166da.tar.gz spark-b93f9d42f21f03163734ef97b2871db945e166da.tar.bz2 spark-b93f9d42f21f03163734ef97b2871db945e166da.zip |
Merge pull request #400 from tdas/dstream-move
Moved DStream and PairDSream to org.apache.spark.streaming.dstream
Similar to the package location of `org.apache.spark.rdd.RDD`, `DStream` has been moved from `org.apache.spark.streaming.DStream` to `org.apache.spark.streaming.dstream.DStream`. I know that the package name is a little long, but I think its better to keep it consistent with Spark's structure.
Also fixed persistence of windowed DStream. The RDDs generated generated by windowed DStream are essentially unions of underlying RDDs, and persistent these union RDDs would store numerous copies of the underlying data. Instead setting the persistence level on the windowed DStream is made to set the persistence level of the underlying DStream.
Diffstat (limited to 'streaming/src/test')
6 files changed, 21 insertions, 3 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 9406e0e20a..7037aae234 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.SparkContext._ import util.ManualClock import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.streaming.dstream.DStream class BasicOperationsSuite extends TestSuiteBase { test("map") { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 67ce5bc566..0c68c44ddb 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -26,7 +26,7 @@ import com.google.common.io.Files import org.apache.hadoop.fs.{Path, FileSystem} import org.apache.hadoop.conf.Configuration import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.streaming.dstream.FileInputDStream +import org.apache.spark.streaming.dstream.{DStream, FileInputDStream} import org.apache.spark.streaming.util.ManualClock import org.apache.spark.util.Utils import org.apache.spark.SparkConf diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index a477d200c9..f7f3346f81 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -23,6 +23,7 @@ import org.scalatest.concurrent.Timeouts import org.scalatest.time.SpanSugar._ import org.apache.spark.{SparkException, SparkConf, SparkContext} import org.apache.spark.util.{Utils, MetadataCleaner} +import org.apache.spark.streaming.dstream.DStream class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { @@ -186,7 +187,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { ssc = new StreamingContext(master, appName, batchDuration) val inputStream = addInputStream(ssc) inputStream.map(x => { throw new TestException("error in map task"); x}) - .foreach(_.count) + .foreachRDD(_.count) val exception = intercept[Exception] { ssc.start() diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index fa64142096..9e0f2c900e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming import org.apache.spark.streaming.scheduler._ import scala.collection.mutable.ArrayBuffer import org.scalatest.matchers.ShouldMatchers +import org.apache.spark.streaming.dstream.DStream class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 9b2bb57e77..535e5bd1f1 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming -import org.apache.spark.streaming.dstream.{InputDStream, ForEachDStream} +import org.apache.spark.streaming.dstream.{DStream, InputDStream, ForEachDStream} import org.apache.spark.streaming.util.ManualClock import scala.collection.mutable.ArrayBuffer diff --git a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala index c39abfc21b..471c99fab4 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala @@ -18,6 +18,8 @@ package org.apache.spark.streaming import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.storage.StorageLevel class WindowOperationsSuite extends TestSuiteBase { @@ -143,6 +145,19 @@ class WindowOperationsSuite extends TestSuiteBase { Seconds(3) ) + test("window - persistence level") { + val input = Seq( Seq(0), Seq(1), Seq(2), Seq(3), Seq(4), Seq(5)) + val ssc = new StreamingContext(conf, batchDuration) + val inputStream = new TestInputStream[Int](ssc, input, 1) + val windowStream1 = inputStream.window(batchDuration * 2) + assert(windowStream1.storageLevel === StorageLevel.NONE) + assert(inputStream.storageLevel === StorageLevel.MEMORY_ONLY_SER) + windowStream1.persist(StorageLevel.MEMORY_ONLY) + assert(windowStream1.storageLevel === StorageLevel.NONE) + assert(inputStream.storageLevel === StorageLevel.MEMORY_ONLY) + ssc.stop() + } + // Testing naive reduceByKeyAndWindow (without invertible function) testReduceByKeyAndWindow( |