aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-13 23:23:46 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-13 23:23:46 -0800
commit4e497db8f3826cf5142b2165a08d02c6f3c2cd90 (patch)
tree9ec25e86ccf8986035215e51f7b0e1ba1b96dad6 /streaming
parent1233b3de01be1ff57910786f5f3e2e2a23e228ab (diff)
downloadspark-4e497db8f3826cf5142b2165a08d02c6f3c2cd90.tar.gz
spark-4e497db8f3826cf5142b2165a08d02c6f3c2cd90.tar.bz2
spark-4e497db8f3826cf5142b2165a08d02c6f3c2cd90.zip
Removed StreamingContext.registerInputStream and registerOutputStream - they were useless as InputDStream has been made to register itself. Also made DStream.register() private[streaming] - not useful to expose the confusing function. Updated a lot of documentation.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala40
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala7
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala25
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala24
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala9
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/package.scala38
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala2
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala3
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala1
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala8
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala1
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala7
14 files changed, 92 insertions, 80 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 7b27933403..26257e652e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -199,10 +199,7 @@ class StreamingContext private[streaming] (
*/
def networkStream[T: ClassTag](
receiver: NetworkReceiver[T]): DStream[T] = {
- val inputStream = new PluggableInputDStream[T](this,
- receiver)
- graph.addInputStream(inputStream)
- inputStream
+ new PluggableInputDStream[T](this, receiver)
}
/**
@@ -259,9 +256,7 @@ class StreamingContext private[streaming] (
converter: (InputStream) => Iterator[T],
storageLevel: StorageLevel
): DStream[T] = {
- val inputStream = new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
- registerInputStream(inputStream)
- inputStream
+ new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
}
/**
@@ -280,9 +275,7 @@ class StreamingContext private[streaming] (
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[T] = {
- val inputStream = new RawInputDStream[T](this, hostname, port, storageLevel)
- registerInputStream(inputStream)
- inputStream
+ new RawInputDStream[T](this, hostname, port, storageLevel)
}
/**
@@ -300,9 +293,7 @@ class StreamingContext private[streaming] (
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
] (directory: String): DStream[(K, V)] = {
- val inputStream = new FileInputDStream[K, V, F](this, directory)
- registerInputStream(inputStream)
- inputStream
+ new FileInputDStream[K, V, F](this, directory)
}
/**
@@ -322,9 +313,7 @@ class StreamingContext private[streaming] (
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): DStream[(K, V)] = {
- val inputStream = new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
- registerInputStream(inputStream)
- inputStream
+ new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
}
/**
@@ -367,9 +356,7 @@ class StreamingContext private[streaming] (
oneAtATime: Boolean,
defaultRDD: RDD[T]
): DStream[T] = {
- val inputStream = new QueueInputDStream(this, queue, oneAtATime, defaultRDD)
- registerInputStream(inputStream)
- inputStream
+ new QueueInputDStream(this, queue, oneAtATime, defaultRDD)
}
/**
@@ -390,21 +377,6 @@ class StreamingContext private[streaming] (
new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc))
}
- /**
- * Register an input stream that will be started (InputDStream.start() called) to get the
- * input data.
- */
- def registerInputStream(inputStream: InputDStream[_]) {
- graph.addInputStream(inputStream)
- }
-
- /**
- * Register an output stream that will be computed every interval
- */
- def registerOutputStream(outputStream: DStream[_]) {
- graph.addOutputStream(outputStream)
- }
-
/** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for
* receiving system events related to streaming.
*/
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 108950466a..4edf8fa13a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -320,13 +320,6 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
- * Registers an output stream that will be computed every interval
- */
- def registerOutputStream(outputStream: JavaDStreamLike[_, _, _]) {
- ssc.registerOutputStream(outputStream.dstream)
- }
-
- /**
* Creates a input stream from an queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
*
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 9dfcc08abe..299628ce9f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -36,11 +36,12 @@ import org.apache.spark.streaming.Duration
/**
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
* sequence of RDDs (of the same type) representing a continuous stream of data (see
- * [[org.apache.spark.rdd.RDD]] for more details on RDDs). DStreams can either be created from
- * live data (such as, data from * HDFS, Kafka or Flume) or it can be generated by transformation
- * existing DStreams using operations such as `map`, `window` and `reduceByKeyAndWindow`.
- * While a Spark Streaming program is running, each DStream periodically generates a RDD,
- * either from live data or by transforming the RDD generated by a parent DStream.
+ * org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs).
+ * DStreams can either be created from live data (such as, data from Kafka, Flume, sockets, HDFS)
+ * or it can be generated by transforming existing DStreams using operations such as `map`,
+ * `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream
+ * periodically generates a RDD, either from live data or by transforming the RDD generated by a
+ * parent DStream.
*
* This class contains the basic operations available on all DStreams, such as `map`, `filter` and
* `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains
@@ -53,6 +54,8 @@ import org.apache.spark.streaming.Duration
* - A list of other DStreams that the DStream depends on
* - A time interval at which the DStream generates an RDD
* - A function that is used to generate an RDD after each time interval
+ *
+ * There are two types of DStream operations - __transformations__
*/
abstract class DStream[T: ClassTag] (
@@ -519,7 +522,7 @@ abstract class DStream[T: ClassTag] (
* 'this' DStream will be registered as an output stream and therefore materialized.
*/
def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) {
- ssc.registerOutputStream(new ForEachDStream(this, context.sparkContext.clean(foreachFunc)))
+ new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
}
/**
@@ -586,8 +589,7 @@ abstract class DStream[T: ClassTag] (
if (first11.size > 10) println("...")
println()
}
- val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc))
- ssc.registerOutputStream(newStream)
+ new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
}
/**
@@ -760,7 +762,10 @@ abstract class DStream[T: ClassTag] (
this.foreachRDD(saveFunc)
}
- def register() {
- ssc.registerOutputStream(this)
+ /**
+ * Register this DStream as an output DStream.
+ */
+ private[streaming] def register() {
+ ssc.graph.addOutputStream(this)
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index a1075ad304..27303390d9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -22,20 +22,24 @@ import org.apache.spark.streaming.{Time, Duration, StreamingContext}
import scala.reflect.ClassTag
/**
- * This is the abstract base class for all input streams. This class provides to methods
- * start() and stop() which called by the scheduler to start and stop receiving data/
- * Input streams that can generated RDDs from new data just by running a service on
- * the driver node (that is, without running a receiver onworker nodes) can be
- * implemented by directly subclassing this InputDStream. For example,
- * FileInputDStream, a subclass of InputDStream, monitors a HDFS directory for
- * new files and generates RDDs on the new files. For implementing input streams
- * that requires running a receiver on the worker nodes, use NetworkInputDStream
- * as the parent class.
+ * This is the abstract base class for all input streams. This class provides methods
+ * start() and stop() which is called by Spark Streaming system to start and stop receiving data.
+ * Input streams that can generate RDDs from new data by running a service/thread only on
+ * the driver node (that is, without running a receiver on worker nodes), can be
+ * implemented by directly inheriting this InputDStream. For example,
+ * FileInputDStream, a subclass of InputDStream, monitors a HDFS directory from the driver for
+ * new files and generates RDDs with the new files. For implementing input streams
+ * that requires running a receiver on the worker nodes, use
+ * [[org.apache.spark.streaming.dstream.NetworkInputDStream]] as the parent class.
+ *
+ * @param ssc_ Streaming context that will execute this input stream
*/
abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
extends DStream[T](ssc_) {
- var lastValidTime: Time = null
+ private[streaming] var lastValidTime: Time = null
+
+ ssc.graph.addInputStream(this)
/**
* Checks whether the 'time' is valid wrt slideDuration for generating RDD.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index 0f1f6fc2ce..ce153f065d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -36,11 +36,12 @@ import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId}
import org.apache.spark.streaming.scheduler.{DeregisterReceiver, AddBlocks, RegisterReceiver}
/**
- * Abstract class for defining any InputDStream that has to start a receiver on worker
- * nodes to receive external data. Specific implementations of NetworkInputDStream must
+ * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
+ * that has to start a receiver on worker nodes to receive external data.
+ * Specific implementations of NetworkInputDStream must
* define the getReceiver() function that gets the receiver object of type
- * [[org.apache.spark.streaming.dstream.NetworkReceiver]] that will be sent to the workers to receive
- * data.
+ * [[org.apache.spark.streaming.dstream.NetworkReceiver]] that will be sent
+ * to the workers to receive data.
* @param ssc_ Streaming context that will execute this input stream
* @tparam T Class type of the object of this stream
*/
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index 6b3e48382e..f57762321c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -35,6 +35,11 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.conf.Configuration
import org.apache.spark.streaming.{Time, Duration}
+/**
+ * Extra functions available on DStream of (key, value) pairs through an implicit conversion.
+ * Import `org.apache.spark.streaming.StreamingContext._` at the top of your program to use
+ * these functions.
+ */
class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
extends Serializable {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/package.scala b/streaming/src/main/scala/org/apache/spark/streaming/package.scala
new file mode 100644
index 0000000000..4dd985cf5a
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/package.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+/**
+ * Spark Streaming functionality. [[org.apache.spark.streaming.StreamingContext]] serves as the main
+ * entry point to Spark Streaming, while [[org.apache.spark.streaming.dstream.DStream]] is the data
+ * type representing a continuous sequence of RDDs, representing a continuous stream of data.
+ *
+ * In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains operations
+ * available only on DStreams
+ * of key-value pairs, such as `groupByKey` and `reduceByKey`. These operations are automatically
+ * available on any DStream of the right type (e.g. DStream[(Int, Int)] through implicit
+ * conversions when you `import org.apache.spark.streaming.StreamingContext._`.
+ *
+ * For the Java API of Spark Streaming, take a look at the
+ * [[org.apache.spark.streaming.api.java.JavaStreamingContext]] which serves as the entry point, and
+ * the [[org.apache.spark.streaming.api.java.JavaDStream]] and the
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream]] which have the DStream functionality.
+ */
+package object streaming {
+ // For package docs only
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
index be67af3a64..54813934b8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
@@ -191,7 +191,7 @@ object MasterFailureTest extends Logging {
val inputStream = ssc.textFileStream(testDir.toString)
val operatedStream = operation(inputStream)
val outputStream = new TestOutputStream(operatedStream)
- ssc.registerOutputStream(outputStream)
+ outputStream.register()
ssc
}
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
index 42ab9590d6..33f6df8f88 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
@@ -43,7 +43,6 @@ trait JavaTestBase extends TestSuiteBase {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions)
- ssc.ssc.registerInputStream(dstream)
new JavaDStream[T](dstream)
}
@@ -57,7 +56,7 @@ trait JavaTestBase extends TestSuiteBase {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val ostream = new TestOutputStreamWithPartitions(dstream.dstream)
- dstream.dstream.ssc.registerOutputStream(ostream)
+ ostream.register()
}
/**
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 7037aae234..cb53555f5c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -379,7 +379,6 @@ class BasicOperationsSuite extends TestSuiteBase {
val ssc = new StreamingContext(conf, Seconds(1))
val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4))
val stream = new TestInputStream[Int](ssc, input, 2)
- ssc.registerInputStream(stream)
stream.foreachRDD(_ => {}) // Dummy output stream
ssc.start()
Thread.sleep(2000)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 0c68c44ddb..89daf47586 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -237,7 +237,7 @@ class CheckpointSuite extends TestSuiteBase {
val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
val outputBuffer = new ArrayBuffer[Seq[Int]]
var outputStream = new TestOutputStream(reducedStream, outputBuffer)
- ssc.registerOutputStream(outputStream)
+ outputStream.register()
ssc.start()
// Create files and advance manual clock to process them
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index a8e053278a..95bf40ba75 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -54,7 +54,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
val outputStream = new TestOutputStream(networkStream, outputBuffer)
def output = outputBuffer.flatMap(x => x)
- ssc.registerOutputStream(outputStream)
+ outputStream.register()
ssc.start()
// Feed data to the server to send to the network receiver
@@ -103,7 +103,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
def output = outputBuffer.flatMap(x => x)
val outputStream = new TestOutputStream(fileStream, outputBuffer)
- ssc.registerOutputStream(outputStream)
+ outputStream.register()
ssc.start()
// Create files in the temporary directory so that Spark Streaming can read data from it
@@ -156,7 +156,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
val outputStream = new TestOutputStream(networkStream, outputBuffer)
def output = outputBuffer.flatMap(x => x)
- ssc.registerOutputStream(outputStream)
+ outputStream.register()
ssc.start()
// Feed data to the server to send to the network receiver
@@ -209,7 +209,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val outputBuffer = new ArrayBuffer[Seq[Long]] with SynchronizedBuffer[Seq[Long]]
val outputStream = new TestOutputStream(countStream, outputBuffer)
def output = outputBuffer.flatMap(x => x)
- ssc.registerOutputStream(outputStream)
+ outputStream.register()
ssc.start()
// Let the data from the receiver be received
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index f7f3346f81..717da8e004 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -211,7 +211,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
def addInputStream(s: StreamingContext): DStream[Int] = {
val input = (1 to 100).map(i => (1 to i))
val inputStream = new TestInputStream(s, input, 1)
- s.registerInputStream(inputStream)
inputStream
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 535e5bd1f1..201630672a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -181,8 +181,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
val operatedStream = operation(inputStream)
val outputStream = new TestOutputStreamWithPartitions(operatedStream,
new ArrayBuffer[Seq[Seq[V]]] with SynchronizedBuffer[Seq[Seq[V]]])
- ssc.registerInputStream(inputStream)
- ssc.registerOutputStream(outputStream)
+ outputStream.register()
ssc
}
@@ -207,9 +206,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
val operatedStream = operation(inputStream1, inputStream2)
val outputStream = new TestOutputStreamWithPartitions(operatedStream,
new ArrayBuffer[Seq[Seq[W]]] with SynchronizedBuffer[Seq[Seq[W]]])
- ssc.registerInputStream(inputStream1)
- ssc.registerInputStream(inputStream2)
- ssc.registerOutputStream(outputStream)
+ outputStream.register()
ssc
}