aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-13 23:23:46 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-13 23:23:46 -0800
commit4e497db8f3826cf5142b2165a08d02c6f3c2cd90 (patch)
tree9ec25e86ccf8986035215e51f7b0e1ba1b96dad6 /streaming/src/test
parent1233b3de01be1ff57910786f5f3e2e2a23e228ab (diff)
downloadspark-4e497db8f3826cf5142b2165a08d02c6f3c2cd90.tar.gz
spark-4e497db8f3826cf5142b2165a08d02c6f3c2cd90.tar.bz2
spark-4e497db8f3826cf5142b2165a08d02c6f3c2cd90.zip
Removed StreamingContext.registerInputStream and registerOutputStream - they were useless as InputDStream has been made to register itself. Also made DStream.register() private[streaming] - not useful to expose the confusing function. Updated a lot of documentation.
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala3
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala1
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala8
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala1
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala7
6 files changed, 8 insertions, 14 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
index 42ab9590d6..33f6df8f88 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
@@ -43,7 +43,6 @@ trait JavaTestBase extends TestSuiteBase {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions)
- ssc.ssc.registerInputStream(dstream)
new JavaDStream[T](dstream)
}
@@ -57,7 +56,7 @@ trait JavaTestBase extends TestSuiteBase {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val ostream = new TestOutputStreamWithPartitions(dstream.dstream)
- dstream.dstream.ssc.registerOutputStream(ostream)
+ ostream.register()
}
/**
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 7037aae234..cb53555f5c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -379,7 +379,6 @@ class BasicOperationsSuite extends TestSuiteBase {
val ssc = new StreamingContext(conf, Seconds(1))
val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4))
val stream = new TestInputStream[Int](ssc, input, 2)
- ssc.registerInputStream(stream)
stream.foreachRDD(_ => {}) // Dummy output stream
ssc.start()
Thread.sleep(2000)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 0c68c44ddb..89daf47586 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -237,7 +237,7 @@ class CheckpointSuite extends TestSuiteBase {
val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
val outputBuffer = new ArrayBuffer[Seq[Int]]
var outputStream = new TestOutputStream(reducedStream, outputBuffer)
- ssc.registerOutputStream(outputStream)
+ outputStream.register()
ssc.start()
// Create files and advance manual clock to process them
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index a8e053278a..95bf40ba75 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -54,7 +54,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
val outputStream = new TestOutputStream(networkStream, outputBuffer)
def output = outputBuffer.flatMap(x => x)
- ssc.registerOutputStream(outputStream)
+ outputStream.register()
ssc.start()
// Feed data to the server to send to the network receiver
@@ -103,7 +103,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
def output = outputBuffer.flatMap(x => x)
val outputStream = new TestOutputStream(fileStream, outputBuffer)
- ssc.registerOutputStream(outputStream)
+ outputStream.register()
ssc.start()
// Create files in the temporary directory so that Spark Streaming can read data from it
@@ -156,7 +156,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
val outputStream = new TestOutputStream(networkStream, outputBuffer)
def output = outputBuffer.flatMap(x => x)
- ssc.registerOutputStream(outputStream)
+ outputStream.register()
ssc.start()
// Feed data to the server to send to the network receiver
@@ -209,7 +209,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val outputBuffer = new ArrayBuffer[Seq[Long]] with SynchronizedBuffer[Seq[Long]]
val outputStream = new TestOutputStream(countStream, outputBuffer)
def output = outputBuffer.flatMap(x => x)
- ssc.registerOutputStream(outputStream)
+ outputStream.register()
ssc.start()
// Let the data from the receiver be received
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index f7f3346f81..717da8e004 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -211,7 +211,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
def addInputStream(s: StreamingContext): DStream[Int] = {
val input = (1 to 100).map(i => (1 to i))
val inputStream = new TestInputStream(s, input, 1)
- s.registerInputStream(inputStream)
inputStream
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 535e5bd1f1..201630672a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -181,8 +181,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
val operatedStream = operation(inputStream)
val outputStream = new TestOutputStreamWithPartitions(operatedStream,
new ArrayBuffer[Seq[Seq[V]]] with SynchronizedBuffer[Seq[Seq[V]]])
- ssc.registerInputStream(inputStream)
- ssc.registerOutputStream(outputStream)
+ outputStream.register()
ssc
}
@@ -207,9 +206,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
val operatedStream = operation(inputStream1, inputStream2)
val outputStream = new TestOutputStreamWithPartitions(operatedStream,
new ArrayBuffer[Seq[Seq[W]]] with SynchronizedBuffer[Seq[Seq[W]]])
- ssc.registerInputStream(inputStream1)
- ssc.registerInputStream(inputStream2)
- ssc.registerOutputStream(outputStream)
+ outputStream.register()
ssc
}