From 4e497db8f3826cf5142b2165a08d02c6f3c2cd90 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 13 Jan 2014 23:23:46 -0800 Subject: Removed StreamingContext.registerInputStream and registerOutputStream - they were useless as InputDStream has been made to register itself. Also made DStream.register() private[streaming] - not useful to expose the confusing function. Updated a lot of documentation. --- .../apache/spark/streaming/StreamingContext.scala | 40 ++++------------------ .../streaming/api/java/JavaStreamingContext.scala | 7 ---- .../apache/spark/streaming/dstream/DStream.scala | 25 ++++++++------ .../spark/streaming/dstream/InputDStream.scala | 24 +++++++------ .../streaming/dstream/NetworkInputDStream.scala | 9 ++--- .../streaming/dstream/PairDStreamFunctions.scala | 5 +++ .../scala/org/apache/spark/streaming/package.scala | 38 ++++++++++++++++++++ .../spark/streaming/util/MasterFailureTest.scala | 2 +- .../org/apache/spark/streaming/JavaTestUtils.scala | 3 +- .../spark/streaming/BasicOperationsSuite.scala | 1 - .../apache/spark/streaming/CheckpointSuite.scala | 2 +- .../apache/spark/streaming/InputStreamsSuite.scala | 8 ++--- .../spark/streaming/StreamingContextSuite.scala | 1 - .../org/apache/spark/streaming/TestSuiteBase.scala | 7 ++-- 14 files changed, 92 insertions(+), 80 deletions(-) create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/package.scala (limited to 'streaming/src') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 7b27933403..26257e652e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -199,10 +199,7 @@ class StreamingContext private[streaming] ( */ def networkStream[T: ClassTag]( receiver: NetworkReceiver[T]): DStream[T] = { - val inputStream = new PluggableInputDStream[T](this, - receiver) - graph.addInputStream(inputStream) - inputStream + new PluggableInputDStream[T](this, receiver) } /** @@ -259,9 +256,7 @@ class StreamingContext private[streaming] ( converter: (InputStream) => Iterator[T], storageLevel: StorageLevel ): DStream[T] = { - val inputStream = new SocketInputDStream[T](this, hostname, port, converter, storageLevel) - registerInputStream(inputStream) - inputStream + new SocketInputDStream[T](this, hostname, port, converter, storageLevel) } /** @@ -280,9 +275,7 @@ class StreamingContext private[streaming] ( port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): DStream[T] = { - val inputStream = new RawInputDStream[T](this, hostname, port, storageLevel) - registerInputStream(inputStream) - inputStream + new RawInputDStream[T](this, hostname, port, storageLevel) } /** @@ -300,9 +293,7 @@ class StreamingContext private[streaming] ( V: ClassTag, F <: NewInputFormat[K, V]: ClassTag ] (directory: String): DStream[(K, V)] = { - val inputStream = new FileInputDStream[K, V, F](this, directory) - registerInputStream(inputStream) - inputStream + new FileInputDStream[K, V, F](this, directory) } /** @@ -322,9 +313,7 @@ class StreamingContext private[streaming] ( V: ClassTag, F <: NewInputFormat[K, V]: ClassTag ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): DStream[(K, V)] = { - val inputStream = new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly) - registerInputStream(inputStream) - inputStream + new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly) } /** @@ -367,9 +356,7 @@ class StreamingContext private[streaming] ( oneAtATime: Boolean, defaultRDD: RDD[T] ): DStream[T] = { - val inputStream = new QueueInputDStream(this, queue, oneAtATime, defaultRDD) - registerInputStream(inputStream) - inputStream + new QueueInputDStream(this, queue, oneAtATime, defaultRDD) } /** @@ -390,21 +377,6 @@ class StreamingContext private[streaming] ( new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc)) } - /** - * Register an input stream that will be started (InputDStream.start() called) to get the - * input data. - */ - def registerInputStream(inputStream: InputDStream[_]) { - graph.addInputStream(inputStream) - } - - /** - * Register an output stream that will be computed every interval - */ - def registerOutputStream(outputStream: DStream[_]) { - graph.addOutputStream(outputStream) - } - /** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for * receiving system events related to streaming. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 108950466a..4edf8fa13a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -319,13 +319,6 @@ class JavaStreamingContext(val ssc: StreamingContext) { ssc.actorStream[T](props, name) } - /** - * Registers an output stream that will be computed every interval - */ - def registerOutputStream(outputStream: JavaDStreamLike[_, _, _]) { - ssc.registerOutputStream(outputStream.dstream) - } - /** * Creates a input stream from an queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 9dfcc08abe..299628ce9f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -36,11 +36,12 @@ import org.apache.spark.streaming.Duration /** * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous * sequence of RDDs (of the same type) representing a continuous stream of data (see - * [[org.apache.spark.rdd.RDD]] for more details on RDDs). DStreams can either be created from - * live data (such as, data from * HDFS, Kafka or Flume) or it can be generated by transformation - * existing DStreams using operations such as `map`, `window` and `reduceByKeyAndWindow`. - * While a Spark Streaming program is running, each DStream periodically generates a RDD, - * either from live data or by transforming the RDD generated by a parent DStream. + * org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs). + * DStreams can either be created from live data (such as, data from Kafka, Flume, sockets, HDFS) + * or it can be generated by transforming existing DStreams using operations such as `map`, + * `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream + * periodically generates a RDD, either from live data or by transforming the RDD generated by a + * parent DStream. * * This class contains the basic operations available on all DStreams, such as `map`, `filter` and * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains @@ -53,6 +54,8 @@ import org.apache.spark.streaming.Duration * - A list of other DStreams that the DStream depends on * - A time interval at which the DStream generates an RDD * - A function that is used to generate an RDD after each time interval + * + * There are two types of DStream operations - __transformations__ */ abstract class DStream[T: ClassTag] ( @@ -519,7 +522,7 @@ abstract class DStream[T: ClassTag] ( * 'this' DStream will be registered as an output stream and therefore materialized. */ def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) { - ssc.registerOutputStream(new ForEachDStream(this, context.sparkContext.clean(foreachFunc))) + new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register() } /** @@ -586,8 +589,7 @@ abstract class DStream[T: ClassTag] ( if (first11.size > 10) println("...") println() } - val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc)) - ssc.registerOutputStream(newStream) + new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register() } /** @@ -760,7 +762,10 @@ abstract class DStream[T: ClassTag] ( this.foreachRDD(saveFunc) } - def register() { - ssc.registerOutputStream(this) + /** + * Register this DStream as an output DStream. + */ + private[streaming] def register() { + ssc.graph.addOutputStream(this) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala index a1075ad304..27303390d9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala @@ -22,20 +22,24 @@ import org.apache.spark.streaming.{Time, Duration, StreamingContext} import scala.reflect.ClassTag /** - * This is the abstract base class for all input streams. This class provides to methods - * start() and stop() which called by the scheduler to start and stop receiving data/ - * Input streams that can generated RDDs from new data just by running a service on - * the driver node (that is, without running a receiver onworker nodes) can be - * implemented by directly subclassing this InputDStream. For example, - * FileInputDStream, a subclass of InputDStream, monitors a HDFS directory for - * new files and generates RDDs on the new files. For implementing input streams - * that requires running a receiver on the worker nodes, use NetworkInputDStream - * as the parent class. + * This is the abstract base class for all input streams. This class provides methods + * start() and stop() which is called by Spark Streaming system to start and stop receiving data. + * Input streams that can generate RDDs from new data by running a service/thread only on + * the driver node (that is, without running a receiver on worker nodes), can be + * implemented by directly inheriting this InputDStream. For example, + * FileInputDStream, a subclass of InputDStream, monitors a HDFS directory from the driver for + * new files and generates RDDs with the new files. For implementing input streams + * that requires running a receiver on the worker nodes, use + * [[org.apache.spark.streaming.dstream.NetworkInputDStream]] as the parent class. + * + * @param ssc_ Streaming context that will execute this input stream */ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext) extends DStream[T](ssc_) { - var lastValidTime: Time = null + private[streaming] var lastValidTime: Time = null + + ssc.graph.addInputStream(this) /** * Checks whether the 'time' is valid wrt slideDuration for generating RDD. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala index 0f1f6fc2ce..ce153f065d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala @@ -36,11 +36,12 @@ import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId} import org.apache.spark.streaming.scheduler.{DeregisterReceiver, AddBlocks, RegisterReceiver} /** - * Abstract class for defining any InputDStream that has to start a receiver on worker - * nodes to receive external data. Specific implementations of NetworkInputDStream must + * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]] + * that has to start a receiver on worker nodes to receive external data. + * Specific implementations of NetworkInputDStream must * define the getReceiver() function that gets the receiver object of type - * [[org.apache.spark.streaming.dstream.NetworkReceiver]] that will be sent to the workers to receive - * data. + * [[org.apache.spark.streaming.dstream.NetworkReceiver]] that will be sent + * to the workers to receive data. * @param ssc_ Streaming context that will execute this input stream * @tparam T Class type of the object of this stream */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index 6b3e48382e..f57762321c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -35,6 +35,11 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.conf.Configuration import org.apache.spark.streaming.{Time, Duration} +/** + * Extra functions available on DStream of (key, value) pairs through an implicit conversion. + * Import `org.apache.spark.streaming.StreamingContext._` at the top of your program to use + * these functions. + */ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) extends Serializable { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/package.scala b/streaming/src/main/scala/org/apache/spark/streaming/package.scala new file mode 100644 index 0000000000..4dd985cf5a --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/package.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +/** + * Spark Streaming functionality. [[org.apache.spark.streaming.StreamingContext]] serves as the main + * entry point to Spark Streaming, while [[org.apache.spark.streaming.dstream.DStream]] is the data + * type representing a continuous sequence of RDDs, representing a continuous stream of data. + * + * In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains operations + * available only on DStreams + * of key-value pairs, such as `groupByKey` and `reduceByKey`. These operations are automatically + * available on any DStream of the right type (e.g. DStream[(Int, Int)] through implicit + * conversions when you `import org.apache.spark.streaming.StreamingContext._`. + * + * For the Java API of Spark Streaming, take a look at the + * [[org.apache.spark.streaming.api.java.JavaStreamingContext]] which serves as the entry point, and + * the [[org.apache.spark.streaming.api.java.JavaDStream]] and the + * [[org.apache.spark.streaming.api.java.JavaPairDStream]] which have the DStream functionality. + */ +package object streaming { + // For package docs only +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala index be67af3a64..54813934b8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala @@ -191,7 +191,7 @@ object MasterFailureTest extends Logging { val inputStream = ssc.textFileStream(testDir.toString) val operatedStream = operation(inputStream) val outputStream = new TestOutputStream(operatedStream) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc } diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala index 42ab9590d6..33f6df8f88 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala @@ -43,7 +43,6 @@ trait JavaTestBase extends TestSuiteBase { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions) - ssc.ssc.registerInputStream(dstream) new JavaDStream[T](dstream) } @@ -57,7 +56,7 @@ trait JavaTestBase extends TestSuiteBase { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val ostream = new TestOutputStreamWithPartitions(dstream.dstream) - dstream.dstream.ssc.registerOutputStream(ostream) + ostream.register() } /** diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 7037aae234..cb53555f5c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -379,7 +379,6 @@ class BasicOperationsSuite extends TestSuiteBase { val ssc = new StreamingContext(conf, Seconds(1)) val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4)) val stream = new TestInputStream[Int](ssc, input, 2) - ssc.registerInputStream(stream) stream.foreachRDD(_ => {}) // Dummy output stream ssc.start() Thread.sleep(2000) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 0c68c44ddb..89daf47586 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -237,7 +237,7 @@ class CheckpointSuite extends TestSuiteBase { val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1)) val outputBuffer = new ArrayBuffer[Seq[Int]] var outputStream = new TestOutputStream(reducedStream, outputBuffer) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc.start() // Create files and advance manual clock to process them diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index a8e053278a..95bf40ba75 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -54,7 +54,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] val outputStream = new TestOutputStream(networkStream, outputBuffer) def output = outputBuffer.flatMap(x => x) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc.start() // Feed data to the server to send to the network receiver @@ -103,7 +103,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] def output = outputBuffer.flatMap(x => x) val outputStream = new TestOutputStream(fileStream, outputBuffer) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc.start() // Create files in the temporary directory so that Spark Streaming can read data from it @@ -156,7 +156,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] val outputStream = new TestOutputStream(networkStream, outputBuffer) def output = outputBuffer.flatMap(x => x) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc.start() // Feed data to the server to send to the network receiver @@ -209,7 +209,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val outputBuffer = new ArrayBuffer[Seq[Long]] with SynchronizedBuffer[Seq[Long]] val outputStream = new TestOutputStream(countStream, outputBuffer) def output = outputBuffer.flatMap(x => x) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc.start() // Let the data from the receiver be received diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index f7f3346f81..717da8e004 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -211,7 +211,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { def addInputStream(s: StreamingContext): DStream[Int] = { val input = (1 to 100).map(i => (1 to i)) val inputStream = new TestInputStream(s, input, 1) - s.registerInputStream(inputStream) inputStream } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 535e5bd1f1..201630672a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -181,8 +181,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { val operatedStream = operation(inputStream) val outputStream = new TestOutputStreamWithPartitions(operatedStream, new ArrayBuffer[Seq[Seq[V]]] with SynchronizedBuffer[Seq[Seq[V]]]) - ssc.registerInputStream(inputStream) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc } @@ -207,9 +206,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { val operatedStream = operation(inputStream1, inputStream2) val outputStream = new TestOutputStreamWithPartitions(operatedStream, new ArrayBuffer[Seq[Seq[W]]] with SynchronizedBuffer[Seq[Seq[W]]]) - ssc.registerInputStream(inputStream1) - ssc.registerInputStream(inputStream2) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc } -- cgit v1.2.3 From f8bd828c7ccf1ff69bc35bf95d07183cb35a7c72 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 14 Jan 2014 00:03:46 -0800 Subject: Fixed loose ends in docs. --- docs/streaming-programming-guide.md | 4 ++-- .../src/main/scala/org/apache/spark/streaming/dstream/DStream.scala | 2 -- 2 files changed, 2 insertions(+), 4 deletions(-) (limited to 'streaming/src') diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 1495af2267..07c4c55633 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -48,10 +48,10 @@ ssc.textFileStream(directory) // Creates a stream that monitors and processes ssc.socketStream(hostname, port) // Creates a stream that uses a TCP socket to read data from hostname:port {% endhighlight %} -The core Spark Streaming API provides input streams for files, sockets, Akka actors. Additional functionality for Kafka, Flume, ZeroMQ, Twitter, etc. can be imported by adding the right dependencies as explained in the [linking](#linking-with-spark-streaming) section. +The core Spark Streaming API provides input streams for files, sockets, and Akka actors. Additional functionality for Kafka, Flume, ZeroMQ, Twitter, etc. can be imported by adding the right dependencies as explained in the [linking](#linking-with-spark-streaming) section. # DStream Operations -Data received from the input streams can be processed using _DStream operations_. There are two kinds of operations - _transformations_ and _output operations_. Similar to RDD transformations, DStream transformations operate on one or more DStreams to create new DStreams with transformed data. After applying a sequence of transformations to the input streams, output operations need to called, which writes data out to an external data sink like a file system or a database. +Data received from the input streams can be processed using _DStream operations_. There are two kinds of operations - _transformations_ and _output operations_. Similar to RDD transformations, DStream transformations operate on one or more DStreams to create new DStreams with transformed data. After applying a sequence of transformations to the input streams, output operations need to called, which write data out to an external data sink like a file system or a database. ## Transformations diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 844316a1c7..71a4c5c93e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -54,8 +54,6 @@ import org.apache.spark.streaming.Duration * - A list of other DStreams that the DStream depends on * - A time interval at which the DStream generates an RDD * - A function that is used to generate an RDD after each time interval - * - * There are two types of DStream operations - __transformations__ */ abstract class DStream[T: ClassTag] ( -- cgit v1.2.3