diff options
Diffstat (limited to 'streaming/src/test')
6 files changed, 8 insertions, 14 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala index 42ab9590d6..33f6df8f88 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala @@ -43,7 +43,6 @@ trait JavaTestBase extends TestSuiteBase { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions) - ssc.ssc.registerInputStream(dstream) new JavaDStream[T](dstream) } @@ -57,7 +56,7 @@ trait JavaTestBase extends TestSuiteBase { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val ostream = new TestOutputStreamWithPartitions(dstream.dstream) - dstream.dstream.ssc.registerOutputStream(ostream) + ostream.register() } /** diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index b73edf81d4..bcb0c28bf0 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -381,7 +381,6 @@ class BasicOperationsSuite extends TestSuiteBase { val ssc = new StreamingContext(conf, Seconds(1)) val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4)) val stream = new TestInputStream[Int](ssc, input, 2) - ssc.registerInputStream(stream) stream.foreachRDD(_ => {}) // Dummy output stream ssc.start() Thread.sleep(2000) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 0c68c44ddb..89daf47586 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -237,7 +237,7 @@ class CheckpointSuite extends TestSuiteBase { val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1)) val outputBuffer = new ArrayBuffer[Seq[Int]] var outputStream = new TestOutputStream(reducedStream, outputBuffer) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc.start() // Create files and advance manual clock to process them diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index a8e053278a..95bf40ba75 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -54,7 +54,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] val outputStream = new TestOutputStream(networkStream, outputBuffer) def output = outputBuffer.flatMap(x => x) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc.start() // Feed data to the server to send to the network receiver @@ -103,7 +103,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] def output = outputBuffer.flatMap(x => x) val outputStream = new TestOutputStream(fileStream, outputBuffer) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc.start() // Create files in the temporary directory so that Spark Streaming can read data from it @@ -156,7 +156,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] val outputStream = new TestOutputStream(networkStream, outputBuffer) def output = outputBuffer.flatMap(x => x) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc.start() // Feed data to the server to send to the network receiver @@ -209,7 +209,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val outputBuffer = new ArrayBuffer[Seq[Long]] with SynchronizedBuffer[Seq[Long]] val outputStream = new TestOutputStream(countStream, outputBuffer) def output = outputBuffer.flatMap(x => x) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc.start() // Let the data from the receiver be received diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index f7f3346f81..717da8e004 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -211,7 +211,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { def addInputStream(s: StreamingContext): DStream[Int] = { val input = (1 to 100).map(i => (1 to i)) val inputStream = new TestInputStream(s, input, 1) - s.registerInputStream(inputStream) inputStream } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 535e5bd1f1..201630672a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -181,8 +181,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { val operatedStream = operation(inputStream) val outputStream = new TestOutputStreamWithPartitions(operatedStream, new ArrayBuffer[Seq[Seq[V]]] with SynchronizedBuffer[Seq[Seq[V]]]) - ssc.registerInputStream(inputStream) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc } @@ -207,9 +206,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { val operatedStream = operation(inputStream1, inputStream2) val outputStream = new TestOutputStreamWithPartitions(operatedStream, new ArrayBuffer[Seq[Seq[W]]] with SynchronizedBuffer[Seq[Seq[W]]]) - ssc.registerInputStream(inputStream1) - ssc.registerInputStream(inputStream2) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc } |