diff options
author | Patrick Wendell <pwendell@gmail.com> | 2014-01-14 00:05:37 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-01-14 00:05:37 -0800 |
commit | 980250b1ee0cdba9cf06ea87c790a2d504bbf03e (patch) | |
tree | 547a023f4a672ab84299762d71e40d19dc2cf92f /streaming/src/main/scala/org | |
parent | 055be5c6940a82e2d7fa39f968a434643a1eb1e2 (diff) | |
parent | f8bd828c7ccf1ff69bc35bf95d07183cb35a7c72 (diff) | |
download | spark-980250b1ee0cdba9cf06ea87c790a2d504bbf03e.tar.gz spark-980250b1ee0cdba9cf06ea87c790a2d504bbf03e.tar.bz2 spark-980250b1ee0cdba9cf06ea87c790a2d504bbf03e.zip |
Merge pull request #416 from tdas/filestream-fix
Removed unnecessary DStream operations and updated docs
Removed StreamingContext.registerInputStream and registerOutputStream - they were useless. InputDStream has been made to register itself, and just registering a DStream as output stream cause RDD objects to be created but the RDDs will not be computed at all.. Also made DStream.register() private[streaming] for the same reasons.
Updated docs, specially added package documentation for streaming package.
Also, changed NetworkWordCount's input storage level to use MEMORY_ONLY, replication on the local machine causes warning messages (as replication fails) which is scary for a new user trying out his/her first example.
Diffstat (limited to 'streaming/src/main/scala/org')
8 files changed, 79 insertions, 66 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 7b27933403..26257e652e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -199,10 +199,7 @@ class StreamingContext private[streaming] ( */ def networkStream[T: ClassTag]( receiver: NetworkReceiver[T]): DStream[T] = { - val inputStream = new PluggableInputDStream[T](this, - receiver) - graph.addInputStream(inputStream) - inputStream + new PluggableInputDStream[T](this, receiver) } /** @@ -259,9 +256,7 @@ class StreamingContext private[streaming] ( converter: (InputStream) => Iterator[T], storageLevel: StorageLevel ): DStream[T] = { - val inputStream = new SocketInputDStream[T](this, hostname, port, converter, storageLevel) - registerInputStream(inputStream) - inputStream + new SocketInputDStream[T](this, hostname, port, converter, storageLevel) } /** @@ -280,9 +275,7 @@ class StreamingContext private[streaming] ( port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): DStream[T] = { - val inputStream = new RawInputDStream[T](this, hostname, port, storageLevel) - registerInputStream(inputStream) - inputStream + new RawInputDStream[T](this, hostname, port, storageLevel) } /** @@ -300,9 +293,7 @@ class StreamingContext private[streaming] ( V: ClassTag, F <: NewInputFormat[K, V]: ClassTag ] (directory: String): DStream[(K, V)] = { - val inputStream = new FileInputDStream[K, V, F](this, directory) - registerInputStream(inputStream) - inputStream + new FileInputDStream[K, V, F](this, directory) } /** @@ -322,9 +313,7 @@ class StreamingContext private[streaming] ( V: ClassTag, F <: NewInputFormat[K, V]: ClassTag ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): DStream[(K, V)] = { - val inputStream = new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly) - registerInputStream(inputStream) - inputStream + new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly) } /** @@ -367,9 +356,7 @@ class StreamingContext private[streaming] ( oneAtATime: Boolean, defaultRDD: RDD[T] ): DStream[T] = { - val inputStream = new QueueInputDStream(this, queue, oneAtATime, defaultRDD) - registerInputStream(inputStream) - inputStream + new QueueInputDStream(this, queue, oneAtATime, defaultRDD) } /** @@ -390,21 +377,6 @@ class StreamingContext private[streaming] ( new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc)) } - /** - * Register an input stream that will be started (InputDStream.start() called) to get the - * input data. - */ - def registerInputStream(inputStream: InputDStream[_]) { - graph.addInputStream(inputStream) - } - - /** - * Register an output stream that will be computed every interval - */ - def registerOutputStream(outputStream: DStream[_]) { - graph.addOutputStream(outputStream) - } - /** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for * receiving system events related to streaming. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 108950466a..4edf8fa13a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -320,13 +320,6 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * Registers an output stream that will be computed every interval - */ - def registerOutputStream(outputStream: JavaDStreamLike[_, _, _]) { - ssc.registerOutputStream(outputStream.dstream) - } - - /** * Creates a input stream from an queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue. * diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 426f61e24a..71a4c5c93e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -36,11 +36,12 @@ import org.apache.spark.streaming.Duration /** * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous * sequence of RDDs (of the same type) representing a continuous stream of data (see - * [[org.apache.spark.rdd.RDD]] for more details on RDDs). DStreams can either be created from - * live data (such as, data from * HDFS, Kafka or Flume) or it can be generated by transformation - * existing DStreams using operations such as `map`, `window` and `reduceByKeyAndWindow`. - * While a Spark Streaming program is running, each DStream periodically generates a RDD, - * either from live data or by transforming the RDD generated by a parent DStream. + * org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs). + * DStreams can either be created from live data (such as, data from Kafka, Flume, sockets, HDFS) + * or it can be generated by transforming existing DStreams using operations such as `map`, + * `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream + * periodically generates a RDD, either from live data or by transforming the RDD generated by a + * parent DStream. * * This class contains the basic operations available on all DStreams, such as `map`, `filter` and * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains @@ -523,7 +524,7 @@ abstract class DStream[T: ClassTag] ( * 'this' DStream will be registered as an output stream and therefore materialized. */ def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) { - ssc.registerOutputStream(new ForEachDStream(this, context.sparkContext.clean(foreachFunc))) + new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register() } /** @@ -590,8 +591,7 @@ abstract class DStream[T: ClassTag] ( if (first11.size > 10) println("...") println() } - val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc)) - ssc.registerOutputStream(newStream) + new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register() } /** @@ -768,8 +768,8 @@ abstract class DStream[T: ClassTag] ( * Register this streaming as an output stream. This would ensure that RDDs of this * DStream will be generated. */ - def register(): DStream[T] = { - ssc.registerOutputStream(this) + private[streaming] def register(): DStream[T] = { + ssc.graph.addOutputStream(this) this } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala index a1075ad304..27303390d9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala @@ -22,20 +22,24 @@ import org.apache.spark.streaming.{Time, Duration, StreamingContext} import scala.reflect.ClassTag /** - * This is the abstract base class for all input streams. This class provides to methods - * start() and stop() which called by the scheduler to start and stop receiving data/ - * Input streams that can generated RDDs from new data just by running a service on - * the driver node (that is, without running a receiver onworker nodes) can be - * implemented by directly subclassing this InputDStream. For example, - * FileInputDStream, a subclass of InputDStream, monitors a HDFS directory for - * new files and generates RDDs on the new files. For implementing input streams - * that requires running a receiver on the worker nodes, use NetworkInputDStream - * as the parent class. + * This is the abstract base class for all input streams. This class provides methods + * start() and stop() which is called by Spark Streaming system to start and stop receiving data. + * Input streams that can generate RDDs from new data by running a service/thread only on + * the driver node (that is, without running a receiver on worker nodes), can be + * implemented by directly inheriting this InputDStream. For example, + * FileInputDStream, a subclass of InputDStream, monitors a HDFS directory from the driver for + * new files and generates RDDs with the new files. For implementing input streams + * that requires running a receiver on the worker nodes, use + * [[org.apache.spark.streaming.dstream.NetworkInputDStream]] as the parent class. + * + * @param ssc_ Streaming context that will execute this input stream */ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext) extends DStream[T](ssc_) { - var lastValidTime: Time = null + private[streaming] var lastValidTime: Time = null + + ssc.graph.addInputStream(this) /** * Checks whether the 'time' is valid wrt slideDuration for generating RDD. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala index 0f1f6fc2ce..ce153f065d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala @@ -36,11 +36,12 @@ import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId} import org.apache.spark.streaming.scheduler.{DeregisterReceiver, AddBlocks, RegisterReceiver} /** - * Abstract class for defining any InputDStream that has to start a receiver on worker - * nodes to receive external data. Specific implementations of NetworkInputDStream must + * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]] + * that has to start a receiver on worker nodes to receive external data. + * Specific implementations of NetworkInputDStream must * define the getReceiver() function that gets the receiver object of type - * [[org.apache.spark.streaming.dstream.NetworkReceiver]] that will be sent to the workers to receive - * data. + * [[org.apache.spark.streaming.dstream.NetworkReceiver]] that will be sent + * to the workers to receive data. * @param ssc_ Streaming context that will execute this input stream * @tparam T Class type of the object of this stream */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index 6b3e48382e..f57762321c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -35,6 +35,11 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.conf.Configuration import org.apache.spark.streaming.{Time, Duration} +/** + * Extra functions available on DStream of (key, value) pairs through an implicit conversion. + * Import `org.apache.spark.streaming.StreamingContext._` at the top of your program to use + * these functions. + */ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) extends Serializable { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/package.scala b/streaming/src/main/scala/org/apache/spark/streaming/package.scala new file mode 100644 index 0000000000..4dd985cf5a --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/package.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +/** + * Spark Streaming functionality. [[org.apache.spark.streaming.StreamingContext]] serves as the main + * entry point to Spark Streaming, while [[org.apache.spark.streaming.dstream.DStream]] is the data + * type representing a continuous sequence of RDDs, representing a continuous stream of data. + * + * In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains operations + * available only on DStreams + * of key-value pairs, such as `groupByKey` and `reduceByKey`. These operations are automatically + * available on any DStream of the right type (e.g. DStream[(Int, Int)] through implicit + * conversions when you `import org.apache.spark.streaming.StreamingContext._`. + * + * For the Java API of Spark Streaming, take a look at the + * [[org.apache.spark.streaming.api.java.JavaStreamingContext]] which serves as the entry point, and + * the [[org.apache.spark.streaming.api.java.JavaDStream]] and the + * [[org.apache.spark.streaming.api.java.JavaPairDStream]] which have the DStream functionality. + */ +package object streaming { + // For package docs only +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala index be67af3a64..54813934b8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala @@ -191,7 +191,7 @@ object MasterFailureTest extends Logging { val inputStream = ssc.textFileStream(testDir.toString) val operatedStream = operation(inputStream) val outputStream = new TestOutputStream(operatedStream) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc } |