diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-13 23:23:46 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-13 23:23:46 -0800 |
commit | 4e497db8f3826cf5142b2165a08d02c6f3c2cd90 (patch) | |
tree | 9ec25e86ccf8986035215e51f7b0e1ba1b96dad6 /streaming/src/test | |
parent | 1233b3de01be1ff57910786f5f3e2e2a23e228ab (diff) | |
download | spark-4e497db8f3826cf5142b2165a08d02c6f3c2cd90.tar.gz spark-4e497db8f3826cf5142b2165a08d02c6f3c2cd90.tar.bz2 spark-4e497db8f3826cf5142b2165a08d02c6f3c2cd90.zip |
Removed StreamingContext.registerInputStream and registerOutputStream - they were useless as InputDStream has been made to register itself. Also made DStream.register() private[streaming] - not useful to expose the confusing function. Updated a lot of documentation.
Diffstat (limited to 'streaming/src/test')
6 files changed, 8 insertions, 14 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala index 42ab9590d6..33f6df8f88 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala @@ -43,7 +43,6 @@ trait JavaTestBase extends TestSuiteBase { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions) - ssc.ssc.registerInputStream(dstream) new JavaDStream[T](dstream) } @@ -57,7 +56,7 @@ trait JavaTestBase extends TestSuiteBase { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val ostream = new TestOutputStreamWithPartitions(dstream.dstream) - dstream.dstream.ssc.registerOutputStream(ostream) + ostream.register() } /** diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 7037aae234..cb53555f5c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -379,7 +379,6 @@ class BasicOperationsSuite extends TestSuiteBase { val ssc = new StreamingContext(conf, Seconds(1)) val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4)) val stream = new TestInputStream[Int](ssc, input, 2) - ssc.registerInputStream(stream) stream.foreachRDD(_ => {}) // Dummy output stream ssc.start() Thread.sleep(2000) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 0c68c44ddb..89daf47586 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -237,7 +237,7 @@ class CheckpointSuite extends TestSuiteBase { val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1)) val outputBuffer = new ArrayBuffer[Seq[Int]] var outputStream = new TestOutputStream(reducedStream, outputBuffer) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc.start() // Create files and advance manual clock to process them diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index a8e053278a..95bf40ba75 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -54,7 +54,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] val outputStream = new TestOutputStream(networkStream, outputBuffer) def output = outputBuffer.flatMap(x => x) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc.start() // Feed data to the server to send to the network receiver @@ -103,7 +103,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] def output = outputBuffer.flatMap(x => x) val outputStream = new TestOutputStream(fileStream, outputBuffer) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc.start() // Create files in the temporary directory so that Spark Streaming can read data from it @@ -156,7 +156,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] val outputStream = new TestOutputStream(networkStream, outputBuffer) def output = outputBuffer.flatMap(x => x) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc.start() // Feed data to the server to send to the network receiver @@ -209,7 +209,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val outputBuffer = new ArrayBuffer[Seq[Long]] with SynchronizedBuffer[Seq[Long]] val outputStream = new TestOutputStream(countStream, outputBuffer) def output = outputBuffer.flatMap(x => x) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc.start() // Let the data from the receiver be received diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index f7f3346f81..717da8e004 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -211,7 +211,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { def addInputStream(s: StreamingContext): DStream[Int] = { val input = (1 to 100).map(i => (1 to i)) val inputStream = new TestInputStream(s, input, 1) - s.registerInputStream(inputStream) inputStream } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 535e5bd1f1..201630672a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -181,8 +181,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { val operatedStream = operation(inputStream) val outputStream = new TestOutputStreamWithPartitions(operatedStream, new ArrayBuffer[Seq[Seq[V]]] with SynchronizedBuffer[Seq[Seq[V]]]) - ssc.registerInputStream(inputStream) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc } @@ -207,9 +206,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { val operatedStream = operation(inputStream1, inputStream2) val outputStream = new TestOutputStreamWithPartitions(operatedStream, new ArrayBuffer[Seq[Seq[W]]] with SynchronizedBuffer[Seq[Seq[W]]]) - ssc.registerInputStream(inputStream1) - ssc.registerInputStream(inputStream2) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc } |