diff options
Diffstat (limited to 'streaming')
14 files changed, 87 insertions, 80 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 7b27933403..26257e652e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -199,10 +199,7 @@ class StreamingContext private[streaming] ( */ def networkStream[T: ClassTag]( receiver: NetworkReceiver[T]): DStream[T] = { - val inputStream = new PluggableInputDStream[T](this, - receiver) - graph.addInputStream(inputStream) - inputStream + new PluggableInputDStream[T](this, receiver) } /** @@ -259,9 +256,7 @@ class StreamingContext private[streaming] ( converter: (InputStream) => Iterator[T], storageLevel: StorageLevel ): DStream[T] = { - val inputStream = new SocketInputDStream[T](this, hostname, port, converter, storageLevel) - registerInputStream(inputStream) - inputStream + new SocketInputDStream[T](this, hostname, port, converter, storageLevel) } /** @@ -280,9 +275,7 @@ class StreamingContext private[streaming] ( port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): DStream[T] = { - val inputStream = new RawInputDStream[T](this, hostname, port, storageLevel) - registerInputStream(inputStream) - inputStream + new RawInputDStream[T](this, hostname, port, storageLevel) } /** @@ -300,9 +293,7 @@ class StreamingContext private[streaming] ( V: ClassTag, F <: NewInputFormat[K, V]: ClassTag ] (directory: String): DStream[(K, V)] = { - val inputStream = new FileInputDStream[K, V, F](this, directory) - registerInputStream(inputStream) - inputStream + new FileInputDStream[K, V, F](this, directory) } /** @@ -322,9 +313,7 @@ class StreamingContext private[streaming] ( V: ClassTag, F <: NewInputFormat[K, V]: ClassTag ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): DStream[(K, V)] = { - val inputStream = new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly) - registerInputStream(inputStream) - inputStream + new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly) } /** @@ -367,9 +356,7 @@ class StreamingContext private[streaming] ( oneAtATime: Boolean, defaultRDD: RDD[T] ): DStream[T] = { - val inputStream = new QueueInputDStream(this, queue, oneAtATime, defaultRDD) - registerInputStream(inputStream) - inputStream + new QueueInputDStream(this, queue, oneAtATime, defaultRDD) } /** @@ -390,21 +377,6 @@ class StreamingContext private[streaming] ( new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc)) } - /** - * Register an input stream that will be started (InputDStream.start() called) to get the - * input data. - */ - def registerInputStream(inputStream: InputDStream[_]) { - graph.addInputStream(inputStream) - } - - /** - * Register an output stream that will be computed every interval - */ - def registerOutputStream(outputStream: DStream[_]) { - graph.addOutputStream(outputStream) - } - /** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for * receiving system events related to streaming. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 108950466a..4edf8fa13a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -320,13 +320,6 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * Registers an output stream that will be computed every interval - */ - def registerOutputStream(outputStream: JavaDStreamLike[_, _, _]) { - ssc.registerOutputStream(outputStream.dstream) - } - - /** * Creates a input stream from an queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue. * diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 426f61e24a..71a4c5c93e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -36,11 +36,12 @@ import org.apache.spark.streaming.Duration /** * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous * sequence of RDDs (of the same type) representing a continuous stream of data (see - * [[org.apache.spark.rdd.RDD]] for more details on RDDs). DStreams can either be created from - * live data (such as, data from * HDFS, Kafka or Flume) or it can be generated by transformation - * existing DStreams using operations such as `map`, `window` and `reduceByKeyAndWindow`. - * While a Spark Streaming program is running, each DStream periodically generates a RDD, - * either from live data or by transforming the RDD generated by a parent DStream. + * org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs). + * DStreams can either be created from live data (such as, data from Kafka, Flume, sockets, HDFS) + * or it can be generated by transforming existing DStreams using operations such as `map`, + * `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream + * periodically generates a RDD, either from live data or by transforming the RDD generated by a + * parent DStream. * * This class contains the basic operations available on all DStreams, such as `map`, `filter` and * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains @@ -523,7 +524,7 @@ abstract class DStream[T: ClassTag] ( * 'this' DStream will be registered as an output stream and therefore materialized. */ def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) { - ssc.registerOutputStream(new ForEachDStream(this, context.sparkContext.clean(foreachFunc))) + new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register() } /** @@ -590,8 +591,7 @@ abstract class DStream[T: ClassTag] ( if (first11.size > 10) println("...") println() } - val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc)) - ssc.registerOutputStream(newStream) + new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register() } /** @@ -768,8 +768,8 @@ abstract class DStream[T: ClassTag] ( * Register this streaming as an output stream. This would ensure that RDDs of this * DStream will be generated. */ - def register(): DStream[T] = { - ssc.registerOutputStream(this) + private[streaming] def register(): DStream[T] = { + ssc.graph.addOutputStream(this) this } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala index a1075ad304..27303390d9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala @@ -22,20 +22,24 @@ import org.apache.spark.streaming.{Time, Duration, StreamingContext} import scala.reflect.ClassTag /** - * This is the abstract base class for all input streams. This class provides to methods - * start() and stop() which called by the scheduler to start and stop receiving data/ - * Input streams that can generated RDDs from new data just by running a service on - * the driver node (that is, without running a receiver onworker nodes) can be - * implemented by directly subclassing this InputDStream. For example, - * FileInputDStream, a subclass of InputDStream, monitors a HDFS directory for - * new files and generates RDDs on the new files. For implementing input streams - * that requires running a receiver on the worker nodes, use NetworkInputDStream - * as the parent class. + * This is the abstract base class for all input streams. This class provides methods + * start() and stop() which is called by Spark Streaming system to start and stop receiving data. + * Input streams that can generate RDDs from new data by running a service/thread only on + * the driver node (that is, without running a receiver on worker nodes), can be + * implemented by directly inheriting this InputDStream. For example, + * FileInputDStream, a subclass of InputDStream, monitors a HDFS directory from the driver for + * new files and generates RDDs with the new files. For implementing input streams + * that requires running a receiver on the worker nodes, use + * [[org.apache.spark.streaming.dstream.NetworkInputDStream]] as the parent class. + * + * @param ssc_ Streaming context that will execute this input stream */ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext) extends DStream[T](ssc_) { - var lastValidTime: Time = null + private[streaming] var lastValidTime: Time = null + + ssc.graph.addInputStream(this) /** * Checks whether the 'time' is valid wrt slideDuration for generating RDD. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala index 0f1f6fc2ce..ce153f065d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala @@ -36,11 +36,12 @@ import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId} import org.apache.spark.streaming.scheduler.{DeregisterReceiver, AddBlocks, RegisterReceiver} /** - * Abstract class for defining any InputDStream that has to start a receiver on worker - * nodes to receive external data. Specific implementations of NetworkInputDStream must + * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]] + * that has to start a receiver on worker nodes to receive external data. + * Specific implementations of NetworkInputDStream must * define the getReceiver() function that gets the receiver object of type - * [[org.apache.spark.streaming.dstream.NetworkReceiver]] that will be sent to the workers to receive - * data. + * [[org.apache.spark.streaming.dstream.NetworkReceiver]] that will be sent + * to the workers to receive data. * @param ssc_ Streaming context that will execute this input stream * @tparam T Class type of the object of this stream */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index 6b3e48382e..f57762321c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -35,6 +35,11 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.conf.Configuration import org.apache.spark.streaming.{Time, Duration} +/** + * Extra functions available on DStream of (key, value) pairs through an implicit conversion. + * Import `org.apache.spark.streaming.StreamingContext._` at the top of your program to use + * these functions. + */ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) extends Serializable { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/package.scala b/streaming/src/main/scala/org/apache/spark/streaming/package.scala new file mode 100644 index 0000000000..4dd985cf5a --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/package.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +/** + * Spark Streaming functionality. [[org.apache.spark.streaming.StreamingContext]] serves as the main + * entry point to Spark Streaming, while [[org.apache.spark.streaming.dstream.DStream]] is the data + * type representing a continuous sequence of RDDs, representing a continuous stream of data. + * + * In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains operations + * available only on DStreams + * of key-value pairs, such as `groupByKey` and `reduceByKey`. These operations are automatically + * available on any DStream of the right type (e.g. DStream[(Int, Int)] through implicit + * conversions when you `import org.apache.spark.streaming.StreamingContext._`. + * + * For the Java API of Spark Streaming, take a look at the + * [[org.apache.spark.streaming.api.java.JavaStreamingContext]] which serves as the entry point, and + * the [[org.apache.spark.streaming.api.java.JavaDStream]] and the + * [[org.apache.spark.streaming.api.java.JavaPairDStream]] which have the DStream functionality. + */ +package object streaming { + // For package docs only +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala index be67af3a64..54813934b8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala @@ -191,7 +191,7 @@ object MasterFailureTest extends Logging { val inputStream = ssc.textFileStream(testDir.toString) val operatedStream = operation(inputStream) val outputStream = new TestOutputStream(operatedStream) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc } diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala index 42ab9590d6..33f6df8f88 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala @@ -43,7 +43,6 @@ trait JavaTestBase extends TestSuiteBase { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions) - ssc.ssc.registerInputStream(dstream) new JavaDStream[T](dstream) } @@ -57,7 +56,7 @@ trait JavaTestBase extends TestSuiteBase { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val ostream = new TestOutputStreamWithPartitions(dstream.dstream) - dstream.dstream.ssc.registerOutputStream(ostream) + ostream.register() } /** diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index b73edf81d4..bcb0c28bf0 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -381,7 +381,6 @@ class BasicOperationsSuite extends TestSuiteBase { val ssc = new StreamingContext(conf, Seconds(1)) val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4)) val stream = new TestInputStream[Int](ssc, input, 2) - ssc.registerInputStream(stream) stream.foreachRDD(_ => {}) // Dummy output stream ssc.start() Thread.sleep(2000) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 0c68c44ddb..89daf47586 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -237,7 +237,7 @@ class CheckpointSuite extends TestSuiteBase { val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1)) val outputBuffer = new ArrayBuffer[Seq[Int]] var outputStream = new TestOutputStream(reducedStream, outputBuffer) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc.start() // Create files and advance manual clock to process them diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index a8e053278a..95bf40ba75 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -54,7 +54,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] val outputStream = new TestOutputStream(networkStream, outputBuffer) def output = outputBuffer.flatMap(x => x) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc.start() // Feed data to the server to send to the network receiver @@ -103,7 +103,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] def output = outputBuffer.flatMap(x => x) val outputStream = new TestOutputStream(fileStream, outputBuffer) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc.start() // Create files in the temporary directory so that Spark Streaming can read data from it @@ -156,7 +156,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] val outputStream = new TestOutputStream(networkStream, outputBuffer) def output = outputBuffer.flatMap(x => x) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc.start() // Feed data to the server to send to the network receiver @@ -209,7 +209,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val outputBuffer = new ArrayBuffer[Seq[Long]] with SynchronizedBuffer[Seq[Long]] val outputStream = new TestOutputStream(countStream, outputBuffer) def output = outputBuffer.flatMap(x => x) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc.start() // Let the data from the receiver be received diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index f7f3346f81..717da8e004 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -211,7 +211,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { def addInputStream(s: StreamingContext): DStream[Int] = { val input = (1 to 100).map(i => (1 to i)) val inputStream = new TestInputStream(s, input, 1) - s.registerInputStream(inputStream) inputStream } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 535e5bd1f1..201630672a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -181,8 +181,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { val operatedStream = operation(inputStream) val outputStream = new TestOutputStreamWithPartitions(operatedStream, new ArrayBuffer[Seq[Seq[V]]] with SynchronizedBuffer[Seq[Seq[V]]]) - ssc.registerInputStream(inputStream) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc } @@ -207,9 +206,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { val operatedStream = operation(inputStream1, inputStream2) val outputStream = new TestOutputStreamWithPartitions(operatedStream, new ArrayBuffer[Seq[Seq[W]]] with SynchronizedBuffer[Seq[Seq[W]]]) - ssc.registerInputStream(inputStream1) - ssc.registerInputStream(inputStream2) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc } |