aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-02 14:08:15 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-02 14:08:15 -0800
commit96a6ff0b09f276fb38656bb753592b1deeff5dd1 (patch)
tree078a96a58c79ef2fd1051d44ba9aee26a7e249a8
parent493d65ce651dffc79adcdada0eeeed6452b3cc47 (diff)
parent02497f0cd49a24ebc8b92d3471de250319fe56cd (diff)
downloadspark-96a6ff0b09f276fb38656bb753592b1deeff5dd1.tar.gz
spark-96a6ff0b09f276fb38656bb753592b1deeff5dd1.tar.bz2
spark-96a6ff0b09f276fb38656bb753592b1deeff5dd1.zip
Merge branch 'dev-merge' into datahandler-fix
Conflicts: streaming/src/main/scala/spark/streaming/dstream/DataHandler.scala
-rw-r--r--.gitignore2
-rw-r--r--core/src/main/scala/spark/RDD.scala1
-rw-r--r--core/src/main/scala/spark/rdd/ShuffledRDD.scala1
-rwxr-xr-xdocs/_layouts/global.html12
-rw-r--r--docs/_plugins/copy_api_dirs.rb4
-rw-r--r--docs/api.md5
-rw-r--r--docs/configuration.md11
-rw-r--r--docs/streaming-programming-guide.md193
-rw-r--r--streaming/src/main/scala/spark/streaming/Checkpoint.scala5
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala499
-rw-r--r--streaming/src/main/scala/spark/streaming/DStreamGraph.scala1
-rw-r--r--streaming/src/main/scala/spark/streaming/Interval.scala1
-rw-r--r--streaming/src/main/scala/spark/streaming/Job.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/JobManager.scala3
-rw-r--r--streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala10
-rw-r--r--streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala11
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala12
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala164
-rw-r--r--streaming/src/main/scala/spark/streaming/Time.scala37
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala (renamed from streaming/src/main/scala/spark/streaming/CoGroupedDStream.scala)4
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/ConstantInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/ConstantInputDStream.scala)3
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/FileInputDStream.scala)5
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala21
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala20
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala20
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala)28
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala28
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala17
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala19
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala)20
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala21
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala21
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala20
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala)15
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/QueueInputDStream.scala)3
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/RawInputDStream.scala)11
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala (renamed from streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala)6
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala27
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/SocketInputDStream.scala)13
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala (renamed from streaming/src/main/scala/spark/streaming/StateDStream.scala)6
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala19
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala40
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala (renamed from streaming/src/main/scala/spark/streaming/WindowedDStream.scala)5
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/util/Clock.scala8
-rw-r--r--streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/CheckpointSuite.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/FailureSuite.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala1
-rw-r--r--streaming/src/test/scala/spark/streaming/TestSuiteBase.scala50
-rw-r--r--streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala12
54 files changed, 919 insertions, 538 deletions
diff --git a/.gitignore b/.gitignore
index c207409e3c..88d7b56181 100644
--- a/.gitignore
+++ b/.gitignore
@@ -12,6 +12,7 @@ third_party/libmesos.so
third_party/libmesos.dylib
conf/java-opts
conf/spark-env.sh
+conf/streaming-env.sh
conf/log4j.properties
docs/_site
docs/api
@@ -31,4 +32,5 @@ project/plugins/src_managed/
logs/
log/
spark-tests.log
+streaming-tests.log
dependency-reduced-pom.xml
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 59e50a0b6b..1574533430 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -101,7 +101,6 @@ abstract class RDD[T: ClassManifest](
val partitioner: Option[Partitioner] = None
-
// =======================================================================
// Methods and fields available on all RDDs
// =======================================================================
diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
index f40b56be64..1b219473e0 100644
--- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
@@ -1,6 +1,7 @@
package spark.rdd
import spark.{Partitioner, RDD, SparkEnv, ShuffleDependency, Split, TaskContext}
+import spark.SparkContext._
private[spark] class ShuffledRDDSplit(val idx: Int) extends Split {
override val index = idx
diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html
index d656b3e3de..a8be52f23e 100755
--- a/docs/_layouts/global.html
+++ b/docs/_layouts/global.html
@@ -47,11 +47,19 @@
<li><a href="quick-start.html">Quick Start</a></li>
<li><a href="scala-programming-guide.html">Scala</a></li>
<li><a href="java-programming-guide.html">Java</a></li>
- <li><a href="streaming-programming-guide.html">Spark Streaming (Alpha)</a></li>
+ <li><a href="streaming-programming-guide.html">Spark Streaming</a></li>
</ul>
</li>
- <li><a href="api/core/index.html">API (Scaladoc)</a></li>
+ <li class="dropdown">
+ <a href="#" class="dropdown-toggle" data-toggle="dropdown">API (Scaladoc)<b class="caret"></b></a>
+ <ul class="dropdown-menu">
+ <li><a href="api/core/index.html">Spark</a></li>
+ <li><a href="api/examples/index.html">Spark Examples</a></li>
+ <li><a href="api/streaming/index.html">Spark Streaming</a></li>
+ <li><a href="api/bagel/index.html">Bagel</a></li>
+ </ul>
+ </li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">Deploying<b class="caret"></b></a>
diff --git a/docs/_plugins/copy_api_dirs.rb b/docs/_plugins/copy_api_dirs.rb
index e61c105449..7654511eeb 100644
--- a/docs/_plugins/copy_api_dirs.rb
+++ b/docs/_plugins/copy_api_dirs.rb
@@ -2,7 +2,7 @@ require 'fileutils'
include FileUtils
if ENV['SKIP_SCALADOC'] != '1'
- projects = ["core", "examples", "repl", "bagel"]
+ projects = ["core", "examples", "repl", "bagel", "streaming"]
puts "Moving to project root and building scaladoc."
curr_dir = pwd
@@ -11,7 +11,7 @@ if ENV['SKIP_SCALADOC'] != '1'
puts "Running sbt/sbt doc from " + pwd + "; this may take a few minutes..."
puts `sbt/sbt doc`
- puts "moving back into docs dir."
+ puts "Moving back into docs dir."
cd("docs")
# Copy over the scaladoc from each project into the docs directory.
diff --git a/docs/api.md b/docs/api.md
index 43548b223c..3a13e30f82 100644
--- a/docs/api.md
+++ b/docs/api.md
@@ -5,6 +5,7 @@ title: Spark API documentation (Scaladoc)
Here you can find links to the Scaladoc generated for the Spark sbt subprojects. If the following links don't work, try running `sbt/sbt doc` from the Spark project home directory.
-- [Core](api/core/index.html)
-- [Examples](api/examples/index.html)
+- [Spark](api/core/index.html)
+- [Spark Examples](api/examples/index.html)
+- [Spark Streaming](api/streaming/index.html)
- [Bagel](api/bagel/index.html)
diff --git a/docs/configuration.md b/docs/configuration.md
index d8317ea97c..87cb4a6797 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -219,6 +219,17 @@ Apart from these, the following properties are also available, and may be useful
Port for the master to listen on.
</td>
</tr>
+<tr>
+ <td>spark.cleaner.delay</td>
+ <td>(disable)</td>
+ <td>
+ Duration (minutes) of how long Spark will remember any metadata (stages generated, tasks generated, etc.).
+ Periodic cleanups will ensure that metadata older than this duration will be forgetten. This is
+ useful for running Spark for many hours / days (for example, running 24/7 in case of Spark Streaming
+ applications). Note that any RDD that persists in memory for more than this duration will be cleared as well.
+ </td>
+</tr>
+
</table>
# Configuring Logging
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 90916545bc..fc2ea2ef79 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -1,31 +1,48 @@
---
layout: global
-title: Streaming (Alpha) Programming Guide
+title: Spark Streaming Programming Guide
---
+
+* This will become a table of contents (this text will be scraped).
+{:toc}
+
+# Overview
+A Spark Streaming application is very similar to a Spark application; it consists of a *driver program* that runs the user's `main` function and continuous executes various *parallel operations* on input streams of data. The main abstraction Spark Streaming provides is a *discretized stream* (DStream), which is a continuous sequence of RDDs (distributed collection of elements) representing a continuous stream of data. DStreams can created from live incoming data (such as data from a socket, Kafka, etc.) or it can be generated by transformation of existing DStreams using parallel operators like map, reduce, and window. The basic processing model is as follows:
+(i) While a Spark Streaming driver program is running, the system receives data from various sources and and divides the data into batches. Each batch of data is treated as a RDD, that is a immutable and parallel collection of data. These input data RDDs are automatically persisted in memory (serialized by default) and replicated to two nodes for fault-tolerance. This sequence of RDDs is collectively referred to as an InputDStream.
+(ii) Data received by InputDStreams are processed processed using DStream operations. Since all data is represented as RDDs and all DStream operations as RDD operations, data is automatically recovered in the event of node failures.
+
+This guide shows some how to start programming with DStreams.
+
# Initializing Spark Streaming
-The first thing a Spark Streaming program must do is create a `StreamingContext` object, which tells Spark how to access a cluster. A `StreamingContext` can be created from an existing `SparkContext`, or directly:
+The first thing a Spark Streaming program must do is create a `StreamingContext` object, which tells Spark how to access a cluster. A `StreamingContext` can be created by using
{% highlight scala %}
-new StreamingContext(master, jobName, [sparkHome], [jars])
-new StreamingContext(sparkContext)
+new StreamingContext(master, jobName, batchDuration)
{% endhighlight %}
-Once a context is instantiated, the batch interval must be set:
+The `master` parameter is either the [Mesos master URL](running-on-mesos.html) (for running on a cluster)or the special "local" string (for local mode) that is used to create a Spark Context. For more information about this please refer to the [Spark programming guide](scala-programming-guide.html). The `jobName` is the name of the streaming job, which is the same as the jobName used in SparkContext. It is used to identify this job in the Mesos web UI. The `batchDuration` is the size of the batches (as explained earlier). This must be set carefully such the cluster can keep up with the processing of the data streams. Starting with something conservative like 5 seconds maybe a good start. See [Performance Tuning](#setting-the-right-batch-size) section for a detailed discussion.
+This constructor creates a SparkContext object using the given `master` and `jobName` parameters. However, if you already have a SparkContext or you need to create a custom SparkContext by specifying list of JARs, then a StreamingContext can be created from the existing SparkContext, by using
{% highlight scala %}
-context.setBatchDuration(Milliseconds(2000))
+new StreamingContext(sparkContext, batchDuration)
{% endhighlight %}
-# DStreams - Discretized Streams
-The primary abstraction in Spark Streaming is a DStream. A DStream represents distributed collection which is computed periodically according to a specified batch interval. DStream's can be chained together to create complex chains of transformation on streaming data. DStreams can be created by operating on existing DStreams or from an input source. To creating DStreams from an input source, use the StreamingContext:
+
+# Attaching Input Sources - InputDStreams
+The StreamingContext is used to creating InputDStreams from input sources:
{% highlight scala %}
-context.neworkStream(host, port) // A stream that reads from a socket
-context.flumeStream(hosts, ports) // A stream populated by a Flume flow
+// Assuming ssc is the StreamingContext
+ssc.networkStream(hostname, port) // Creates a stream that uses a TCP socket to read data from hostname:port
+ssc.textFileStream(directory) // Creates a stream by monitoring and processing new files in a HDFS directory
{% endhighlight %}
-# DStream Operators
+A complete list of input sources is available in the [StreamingContext API documentation](api/streaming/index.html#spark.streaming.StreamingContext). Data received from these sources can be processed using DStream operations, which are explained next.
+
+
+
+# DStream Operations
Once an input stream has been created, you can transform it using _stream operators_. Most of these operators return new DStreams which you can further transform. Eventually, you'll need to call an _output operator_, which forces evaluation of the stream by writing data out to an external source.
## Transformations
@@ -73,20 +90,13 @@ DStreams support many of the transformations available on normal Spark RDD's:
<td> <b>cogroup</b>(<i>otherStream</i>, [<i>numTasks</i>]) </td>
<td> When called on streams of type (K, V) and (K, W), returns a stream of (K, Seq[V], Seq[W]) tuples. This operation is also called <code>groupWith</code>. </td>
</tr>
-</table>
-
-DStreams also support the following additional transformations:
-
-<table class="table">
<tr>
<td> <b>reduce</b>(<i>func</i>) </td>
<td> Create a new single-element stream by aggregating the elements of the stream using a function func (which takes two arguments and returns one). The function should be associative so that it can be computed correctly in parallel. </td>
</tr>
</table>
-
-## Windowed Transformations
-Spark streaming features windowed computations, which allow you to report statistics over a sliding window of data. All window functions take a <i>windowTime</i>, which represents the width of the window and a <i>slideTime</i>, which represents the frequency during which the window is calculated.
+Spark Streaming features windowed computations, which allow you to report statistics over a sliding window of data. All window functions take a <i>windowTime</i>, which represents the width of the window and a <i>slideTime</i>, which represents the frequency during which the window is calculated.
<table class="table">
<tr><th style="width:25%">Transformation</th><th>Meaning</th></tr>
@@ -128,7 +138,7 @@ Spark streaming features windowed computations, which allow you to report statis
</table>
-## Output Operators
+## Output Operations
When an output operator is called, it triggers the computation of a stream. Currently the following output operators are defined:
<table class="table">
@@ -140,24 +150,159 @@ When an output operator is called, it triggers the computation of a stream. Curr
<tr>
<td> <b>print</b>() </td>
- <td> Prints the contents of this DStream on the driver. At each interval, this will take at most ten elements from the DStream's RDD and print them. </td>
+ <td> Prints first ten elements of every batch of data in a DStream on the driver. </td>
</tr>
<tr>
- <td> <b>saveAsObjectFile</b>(<i>prefix</i>, [<i>suffix</i>]) </td>
+ <td> <b>saveAsObjectFiles</b>(<i>prefix</i>, [<i>suffix</i>]) </td>
<td> Save this DStream's contents as a <code>SequenceFile</code> of serialized objects. The file name at each batch interval is calculated based on <i>prefix</i> and <i>suffix</i>: <i>"prefix-TIME_IN_MS[.suffix]"</i>.
</td>
</tr>
<tr>
- <td> <b>saveAsTextFile</b>(<i>prefix</i>, <i>suffix</i>) </td>
+ <td> <b>saveAsTextFiles</b>(<i>prefix</i>, [<i>suffix</i>]) </td>
<td> Save this DStream's contents as a text files. The file name at each batch interval is calculated based on <i>prefix</i> and <i>suffix</i>: <i>"prefix-TIME_IN_MS[.suffix]"</i>. </td>
</tr>
<tr>
- <td> <b>saveAsHadoopFiles</b>(<i>prefix</i>, <i>suffix</i>) </td>
+ <td> <b>saveAsHadoopFiles</b>(<i>prefix</i>, [<i>suffix</i>]) </td>
<td> Save this DStream's contents as a Hadoop file. The file name at each batch interval is calculated based on <i>prefix</i> and <i>suffix</i>: <i>"prefix-TIME_IN_MS[.suffix]"</i>. </td>
</tr>
</table>
+## DStream Persistence
+Similar to RDDs, DStreams also allow developers to persist the stream's data in memory. That is, using `persist()` method on a DStream would automatically persist every RDD of that DStream in memory. This is useful if the data in the DStream will be computed multiple times (e.g., multiple DStream operations on the same data). For window-based operations like `reduceByWindow` and `reduceByKeyAndWindow` and state-based operations like `updateStateByKey`, this is implicitly true. Hence, DStreams generated by window-based operations are automatically persisted in memory, without the developer calling `persist()`.
+
+Note that, unlike RDDs, the default persistence level of DStreams keeps the data serialized in memory. This is further discussed in the [Performance Tuning](#memory-tuning) section. More information on different persistence levels can be found in [Spark Programming Guide](scala-programming-guide.html#rdd-persistence).
+
+# Starting the Streaming computation
+All the above DStream operations are completely lazy, that is, the operations will start executing only after the context is started by using
+{% highlight scala %}
+ssc.start()
+{% endhighlight %}
+
+Conversely, the computation can be stopped by using
+{% highlight scala %}
+ssc.stop()
+{% endhighlight %}
+
+# Example - WordCountNetwork.scala
+A good example to start off is the spark.streaming.examples.WordCountNetwork. This example counts the words received from a network server every second. Given below is the relevant sections of the source code. You can find the full source code in <Spark repo>/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala.
+
+{% highlight scala %}
+import spark.streaming.{Seconds, StreamingContext}
+import spark.streaming.StreamingContext._
+...
+
+// Create the context and set up a network input stream to receive from a host:port
+val ssc = new StreamingContext(args(0), "WordCountNetwork", Seconds(1))
+val lines = ssc.networkTextStream(args(1), args(2).toInt)
+
+// Split the lines into words, count them, and print some of the counts on the master
+val words = lines.flatMap(_.split(" "))
+val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+wordCounts.print()
+
+// Start the computation
+ssc.start()
+{% endhighlight %}
+
+To run this example on your local machine, you need to first run a Netcat server by using
+
+{% highlight bash %}
+$ nc -lk 9999
+{% endhighlight %}
+
+Then, in a different terminal, you can start WordCountNetwork by using
+
+{% highlight bash %}
+$ ./run spark.streaming.examples.WordCountNetwork local[2] localhost 9999
+{% endhighlight %}
+
+This will make WordCountNetwork connect to the netcat server. Any lines typed in the terminal running the netcat server will be counted and printed on screen.
+
+<table>
+<td>
+{% highlight bash %}
+# TERMINAL 1
+# RUNNING NETCAT
+
+$ nc -lk 9999
+hello world
+
+
+
+
+
+...
+{% endhighlight %}
+</td>
+<td>
+{% highlight bash %}
+# TERMINAL 2: RUNNING WordCountNetwork
+...
+2012-12-31 18:47:10,446 INFO SparkContext: Job finished: run at ThreadPoolExecutor.java:886, took 0.038817 s
+-------------------------------------------
+Time: 1357008430000 ms
+-------------------------------------------
+(hello,1)
+(world,1)
+
+2012-12-31 18:47:10,447 INFO JobManager: Total delay: 0.44700 s for job 8 (execution: 0.44000 s)
+...
+{% endhighlight %}
+</td>
+</table>
+
+
+
+# Performance Tuning
+Getting the best performance of a Spark Streaming application on a cluster requires a bit of tuning. This section explains a number of the parameters and configurations that can tuned to improve the performance of you application. At a high level, you need to consider two things:
+<ol>
+<li>Reducing the processing time of each batch of data by efficiently using cluster resources.</li>
+<li>Setting the right batch size such that the data processing can keep up with the data ingestion.</li>
+</ol>
+
+## Reducing the Processing Time of each Batch
+There are a number of optimizations that can be done in Spark to minimize the processing time of each batch. These have been discussed in detail in [Tuning Guide](tuning.html). This section highlights some of the most important ones.
+
+### Level of Parallelism
+Cluster resources maybe underutilized if the number of parallel tasks used in any stage of the computation is not high enough. For example, for distributed reduce operations like `reduceByKey` and `reduceByKeyAndWindow`, the default number of parallel tasks is 8. You can pass the level of parallelism as an argument (see the [`spark.PairDStreamFunctions`](api/streaming/index.html#spark.PairDStreamFunctions) documentation), or set the system property `spark.default.parallelism` to change the default.
+
+### Data Serialization
+The overhead of data serialization can be significant, especially when sub-second batch sizes are to be achieved. There are two aspects to it.
+* Serialization of RDD data in Spark: Please refer to the detailed discussion on data serialization in the [Tuning Guide](tuning.html). However, note that unlike Spark, by default RDDs are persisted as serialized byte arrays to minimize pauses related to GC.
+* Serialization of input data: To ingest external data into Spark, data received as bytes (say, from the network) needs to deserialized from bytes and re-serialized into Spark's serialization format. Hence, the deserialization overhead of input data may be a bottleneck.
+
+### Task Launching Overheads
+If the number of tasks launched per second is high (say, 50 or more per second), then the overhead of sending out tasks to the slaves maybe significant and will make it hard to achieve sub-second latencies. The overhead can be reduced by the following changes:
+* Task Serialization: Using Kryo serialization for serializing tasks can reduced the task sizes, and therefore reduce the time taken to send them to the slaves.
+* Execution mode: Running Spark in Standalone mode or coarse-grained Mesos mode leads to better task launch times than the fine-grained Mesos mode. Please refer to the [Running on Mesos guide](running-on-mesos.html) for more details.
+These changes may reduce batch processing time by 100s of milliseconds, thus allowing sub-second batch size to be viable.
+
+## Setting the Right Batch Size
+For a Spark Streaming application running on a cluster to be stable, the processing of the data streams must keep up with the rate of ingestion of the data streams. Depending on the type of computation, the batch size used may have significant impact on the rate of ingestion that can be sustained by the Spark Streaming application on a fixed cluster resources. For example, let us consider the earlier WordCountNetwork example. For a particular data rate, the system may be able to keep up with reporting word counts every 2 seconds (i.e., batch size of 2 seconds), but not every 500 milliseconds.
+
+A good approach to figure out the right batch size for your application is to test it with a conservative batch size (say, 5-10 seconds) and a low data rate. To verify whether the system is able to keep up with data rate, you can check the value of the end-to-end delay experienced by each processed batch (in the Spark master logs, find the line having the phrase "Total delay"). If the delay is maintained to be less than the batch size, then system is stable. Otherwise, if the delay is continuously increasing, it means that the system is unable to keep up and it therefore unstable. Once you have an idea of a stable configuration, you can try increasing the data rate and/or reducing the batch size. Note that momentary increase in the delay due to temporary data rate increases maybe fine as long as the delay reduces back to a low value (i.e., less than batch size).
+
+## 24/7 Operation
+By default, Spark does not forget any of the metadata (RDDs generated, stages processed, etc.). But for a Spark Streaming application to operate 24/7, it is necessary for Spark to do periodic cleanup of it metadata. This can be enabled by setting the Java system property `spark.cleaner.delay` to the number of minutes you want any metadata to persist. For example, setting `spark.cleaner.delay` to 10 would cause Spark periodically cleanup all metadata and persisted RDDs that are older than 10 minutes. Note, that this property needs to be set before the SparkContext is created.
+
+This value is closely tied with any window operation that is being used. Any window operation would require the input data to be persisted in memory for at least the duration of the window. Hence it is necessary to set the delay to at least the value of the largest window operation used in the Spark Streaming application. If this delay is set too low, the application will throw an exception saying so.
+
+## Memory Tuning
+Tuning the memory usage and GC behavior of Spark applications have been discussed in great detail in the [Tuning Guide](tuning.html). It is recommended that you read that. In this section, we highlight a few customizations that are strongly recommended to minimize GC related pauses in Spark Streaming applications and achieving more consistent batch processing times.
+
+* <b>Default persistence level of DStreams</b>: Unlike RDDs, the default persistence level of DStreams serializes the data in memory (that is, [StorageLevel.MEMORY_ONLY_SER](api/core/index.html#spark.storage.StorageLevel$) for DStream compared to [StorageLevel.MEMORY_ONLY](api/core/index.html#spark.storage.StorageLevel$) for RDDs). Even though keeping the data serialized incurs a higher serialization overheads, it significantly reduces GC pauses.
+
+* <b>Concurrent garbage collector</b>: Using the concurrent mark-and-sweep GC further minimizes the variability of GC pauses. Even though concurrent GC is known to reduce the overall processing throughput of the system, its use is still recommended to achieve more consistent batch processing times.
+
+# Master Fault-tolerance (Alpha)
+TODO
+
+* Checkpointing of DStream graph
+
+* Recovery from master faults
+
+* Current state and future directions \ No newline at end of file
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index 770f7b0cc0..11a7232d7b 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -8,6 +8,7 @@ import org.apache.hadoop.conf.Configuration
import java.io._
+private[streaming]
class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
extends Logging with Serializable {
val master = ssc.sc.master
@@ -30,6 +31,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
/**
* Convenience class to speed up the writing of graph checkpoint to file
*/
+private[streaming]
class CheckpointWriter(checkpointDir: String) extends Logging {
val file = new Path(checkpointDir, "graph")
val conf = new Configuration()
@@ -65,7 +67,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
}
-
+private[streaming]
object CheckpointReader extends Logging {
def read(path: String): Checkpoint = {
@@ -103,6 +105,7 @@ object CheckpointReader extends Logging {
}
}
+private[streaming]
class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoader) extends ObjectInputStream(inputStream_) {
override def resolveClass(desc: ObjectStreamClass): Class[_] = {
try {
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index d5048aeed7..beba9cfd4f 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -1,17 +1,15 @@
package spark.streaming
+import spark.streaming.dstream._
import StreamingContext._
import Time._
-import spark._
-import spark.SparkContext._
-import spark.rdd._
+import spark.{RDD, Logging}
import spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
-import java.util.concurrent.ArrayBlockingQueue
import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
import org.apache.hadoop.fs.Path
@@ -21,7 +19,7 @@ import org.apache.hadoop.conf.Configuration
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
* sequence of RDDs (of the same type) representing a continuous stream of data (see [[spark.RDD]]
* for more details on RDDs). DStreams can either be created from live data (such as, data from
- * HDFS. Kafka or Flume) or it can be generated by transformation existing DStreams using operations
+ * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations
* such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each
* DStream periodically generates a RDD, either from live data or by transforming the RDD generated
* by a parent DStream.
@@ -38,33 +36,28 @@ import org.apache.hadoop.conf.Configuration
* - A function that is used to generate an RDD after each time interval
*/
-case class DStreamCheckpointData(rdds: HashMap[Time, Any])
-
-abstract class DStream[T: ClassManifest] (@transient var ssc: StreamingContext)
-extends Serializable with Logging {
+abstract class DStream[T: ClassManifest] (
+ @transient protected[streaming] var ssc: StreamingContext
+ ) extends Serializable with Logging {
initLogging()
- /**
- * ----------------------------------------------
- * Methods that must be implemented by subclasses
- * ----------------------------------------------
- */
+ // =======================================================================
+ // Methods that should be implemented by subclasses of DStream
+ // =======================================================================
- // Time interval at which the DStream generates an RDD
+ /** Time interval after which the DStream generates a RDD */
def slideTime: Time
- // List of parent DStreams on which this DStream depends on
+ /** List of parent DStreams on which this DStream depends on */
def dependencies: List[DStream[_]]
- // Key method that computes RDD for a valid time
+ /** Method that generates a RDD for the given time */
def compute (validTime: Time): Option[RDD[T]]
- /**
- * ---------------------------------------
- * Other general fields and methods of DStream
- * ---------------------------------------
- */
+ // =======================================================================
+ // Methods and fields available on all DStreams
+ // =======================================================================
// RDDs generated, marked as protected[streaming] so that testsuites can access it
@transient
@@ -87,12 +80,15 @@ extends Serializable with Logging {
// Reference to whole DStream graph
protected[streaming] var graph: DStreamGraph = null
- def isInitialized = (zeroTime != null)
+ protected[streaming] def isInitialized = (zeroTime != null)
// Duration for which the DStream requires its parent DStream to remember each RDD created
- def parentRememberDuration = rememberDuration
+ protected[streaming] def parentRememberDuration = rememberDuration
+
+ /** Returns the StreamingContext associated with this DStream */
+ def context() = ssc
- // Set caching level for the RDDs created by this DStream
+ /** Persists the RDDs of this DStream with the given storage level */
def persist(level: StorageLevel): DStream[T] = {
if (this.isInitialized) {
throw new UnsupportedOperationException(
@@ -102,11 +98,16 @@ extends Serializable with Logging {
this
}
+ /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER)
-
- // Turn on the default caching level for this RDD
+
+ /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def cache(): DStream[T] = persist()
+ /**
+ * Enable periodic checkpointing of RDDs of this DStream
+ * @param interval Time interval after which generated RDD will be checkpointed
+ */
def checkpoint(interval: Time): DStream[T] = {
if (isInitialized) {
throw new UnsupportedOperationException(
@@ -188,13 +189,13 @@ extends Serializable with Logging {
val metadataCleanerDelay = spark.util.MetadataCleaner.getDelaySeconds
logInfo("metadataCleanupDelay = " + metadataCleanerDelay)
assert(
- metadataCleanerDelay < 0 || rememberDuration < metadataCleanerDelay * 1000,
+ metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000,
"It seems you are doing some DStream window operation or setting a checkpoint interval " +
"which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " +
"than " + rememberDuration.milliseconds + " milliseconds. But the Spark's metadata cleanup" +
"delay is set to " + (metadataCleanerDelay / 60.0) + " minutes, which is not sufficient. Please set " +
"the Java property 'spark.cleaner.delay' to more than " +
- math.ceil(rememberDuration.millis.toDouble / 60000.0).toInt + " minutes."
+ math.ceil(rememberDuration.milliseconds.toDouble / 60000.0).toInt + " minutes."
)
dependencies.foreach(_.validate())
@@ -285,7 +286,7 @@ extends Serializable with Logging {
* Generates a SparkStreaming job for the given time. This is an internal method that
* should not be called directly. This default implementation creates a job
* that materializes the corresponding RDD. Subclasses of DStream may override this
- * (eg. PerRDDForEachDStream).
+ * (eg. ForEachDStream).
*/
protected[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
@@ -420,65 +421,96 @@ extends Serializable with Logging {
generatedRDDs = new HashMap[Time, RDD[T]] ()
}
- /**
- * --------------
- * DStream operations
- * --------------
- */
+ // =======================================================================
+ // DStream operations
+ // =======================================================================
+
+ /** Returns a new DStream by applying a function to all elements of this DStream. */
def map[U: ClassManifest](mapFunc: T => U): DStream[U] = {
new MappedDStream(this, ssc.sc.clean(mapFunc))
}
+ /**
+ * Returns a new DStream by applying a function to all elements of this DStream,
+ * and then flattening the results
+ */
def flatMap[U: ClassManifest](flatMapFunc: T => Traversable[U]): DStream[U] = {
new FlatMappedDStream(this, ssc.sc.clean(flatMapFunc))
}
+ /** Returns a new DStream containing only the elements that satisfy a predicate. */
def filter(filterFunc: T => Boolean): DStream[T] = new FilteredDStream(this, filterFunc)
+ /**
+ * Return a new DStream in which each RDD is generated by applying glom() to each RDD of
+ * this DStream. Applying glom() to an RDD coalesces all elements within each partition into
+ * an array.
+ */
def glom(): DStream[Array[T]] = new GlommedDStream(this)
- def mapPartitions[U: ClassManifest](mapPartFunc: Iterator[T] => Iterator[U]): DStream[U] = {
- new MapPartitionedDStream(this, ssc.sc.clean(mapPartFunc))
+ /**
+ * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs
+ * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
+ * of the RDD.
+ */
+ def mapPartitions[U: ClassManifest](
+ mapPartFunc: Iterator[T] => Iterator[U],
+ preservePartitioning: Boolean = false
+ ): DStream[U] = {
+ new MapPartitionedDStream(this, ssc.sc.clean(mapPartFunc), preservePartitioning)
}
- def reduce(reduceFunc: (T, T) => T): DStream[T] = this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2)
+ /**
+ * Returns a new DStream in which each RDD has a single element generated by reducing each RDD
+ * of this DStream.
+ */
+ def reduce(reduceFunc: (T, T) => T): DStream[T] =
+ this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2)
+ /**
+ * Returns a new DStream in which each RDD has a single element generated by counting each RDD
+ * of this DStream.
+ */
def count(): DStream[Int] = this.map(_ => 1).reduce(_ + _)
-
- def collect(): DStream[Seq[T]] = this.map(x => (null, x)).groupByKey(1).map(_._2)
-
- def foreach(foreachFunc: T => Unit) {
- val newStream = new PerElementForEachDStream(this, ssc.sc.clean(foreachFunc))
- ssc.registerOutputStream(newStream)
- newStream
- }
- def foreachRDD(foreachFunc: RDD[T] => Unit) {
- foreachRDD((r: RDD[T], t: Time) => foreachFunc(r))
+ /**
+ * Applies a function to each RDD in this DStream. This is an output operator, so
+ * this DStream will be registered as an output stream and therefore materialized.
+ */
+ def foreach(foreachFunc: RDD[T] => Unit) {
+ foreach((r: RDD[T], t: Time) => foreachFunc(r))
}
- def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) {
- val newStream = new PerRDDForEachDStream(this, ssc.sc.clean(foreachFunc))
+ /**
+ * Applies a function to each RDD in this DStream. This is an output operator, so
+ * this DStream will be registered as an output stream and therefore materialized.
+ */
+ def foreach(foreachFunc: (RDD[T], Time) => Unit) {
+ val newStream = new ForEachDStream(this, ssc.sc.clean(foreachFunc))
ssc.registerOutputStream(newStream)
newStream
}
- def transformRDD[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
- transformRDD((r: RDD[T], t: Time) => transformFunc(r))
+ /**
+ * Returns a new DStream in which each RDD is generated by applying a function
+ * on each RDD of this DStream.
+ */
+ def transform[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
+ transform((r: RDD[T], t: Time) => transformFunc(r))
}
- def transformRDD[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
+ /**
+ * Returns a new DStream in which each RDD is generated by applying a function
+ * on each RDD of this DStream.
+ */
+ def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
new TransformedDStream(this, ssc.sc.clean(transformFunc))
}
- def toBlockingQueue() = {
- val queue = new ArrayBlockingQueue[RDD[T]](10000)
- this.foreachRDD(rdd => {
- queue.add(rdd)
- })
- queue
- }
-
+ /**
+ * Prints the first ten elements of each RDD generated in this DStream. This is an output
+ * operator, so this DStream will be registered as an output stream and there materialized.
+ */
def print() {
def foreachFunc = (rdd: RDD[T], time: Time) => {
val first11 = rdd.take(11)
@@ -489,18 +521,42 @@ extends Serializable with Logging {
if (first11.size > 10) println("...")
println()
}
- val newStream = new PerRDDForEachDStream(this, ssc.sc.clean(foreachFunc))
+ val newStream = new ForEachDStream(this, ssc.sc.clean(foreachFunc))
ssc.registerOutputStream(newStream)
}
+ /**
+ * Return a new DStream which is computed based on windowed batches of this DStream.
+ * The new DStream generates RDDs with the same interval as this DStream.
+ * @param windowTime width of the window; must be a multiple of this DStream's interval.
+ * @return
+ */
def window(windowTime: Time): DStream[T] = window(windowTime, this.slideTime)
+ /**
+ * Return a new DStream which is computed based on windowed batches of this DStream.
+ * @param windowTime duration (i.e., width) of the window;
+ * must be a multiple of this DStream's interval
+ * @param slideTime sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's interval
+ */
def window(windowTime: Time, slideTime: Time): DStream[T] = {
new WindowedDStream(this, windowTime, slideTime)
}
+ /**
+ * Returns a new DStream which computed based on tumbling window on this DStream.
+ * This is equivalent to window(batchTime, batchTime).
+ * @param batchTime tumbling window duration; must be a multiple of this DStream's interval
+ */
def tumble(batchTime: Time): DStream[T] = window(batchTime, batchTime)
+ /**
+ * Returns a new DStream in which each RDD has a single element generated by reducing all
+ * elements in a window over this DStream. windowTime and slideTime are as defined in the
+ * window() operation. This is equivalent to window(windowTime, slideTime).reduce(reduceFunc)
+ */
def reduceByWindow(reduceFunc: (T, T) => T, windowTime: Time, slideTime: Time): DStream[T] = {
this.window(windowTime, slideTime).reduce(reduceFunc)
}
@@ -516,17 +572,31 @@ extends Serializable with Logging {
.map(_._2)
}
+ /**
+ * Returns a new DStream in which each RDD has a single element generated by counting the number
+ * of elements in a window over this DStream. windowTime and slideTime are as defined in the
+ * window() operation. This is equivalent to window(windowTime, slideTime).count()
+ */
def countByWindow(windowTime: Time, slideTime: Time): DStream[Int] = {
this.map(_ => 1).reduceByWindow(_ + _, _ - _, windowTime, slideTime)
}
+ /**
+ * Returns a new DStream by unifying data of another DStream with this DStream.
+ * @param that Another DStream having the same interval (i.e., slideTime) as this DStream.
+ */
def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that))
- def slice(interval: Interval): Seq[RDD[T]] = {
+ /**
+ * Returns all the RDDs defined by the Interval object (both end times included)
+ */
+ protected[streaming] def slice(interval: Interval): Seq[RDD[T]] = {
slice(interval.beginTime, interval.endTime)
}
- // Get all the RDDs between fromTime to toTime (both included)
+ /**
+ * Returns all the RDDs between 'fromTime' to 'toTime' (both included)
+ */
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
val rdds = new ArrayBuffer[RDD[T]]()
var time = toTime.floor(slideTime)
@@ -540,20 +610,26 @@ extends Serializable with Logging {
rdds.toSeq
}
+ /**
+ * Saves each RDD in this DStream as a Sequence file of serialized objects.
+ */
def saveAsObjectFiles(prefix: String, suffix: String = "") {
val saveFunc = (rdd: RDD[T], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsObjectFile(file)
}
- this.foreachRDD(saveFunc)
+ this.foreach(saveFunc)
}
+ /**
+ * Saves each RDD in this DStream as at text file, using string representation of elements.
+ */
def saveAsTextFiles(prefix: String, suffix: String = "") {
val saveFunc = (rdd: RDD[T], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsTextFile(file)
}
- this.foreachRDD(saveFunc)
+ this.foreach(saveFunc)
}
def register() {
@@ -561,293 +637,6 @@ extends Serializable with Logging {
}
}
+private[streaming]
+case class DStreamCheckpointData(rdds: HashMap[Time, Any])
-abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext)
- extends DStream[T](ssc_) {
-
- override def dependencies = List()
-
- override def slideTime = {
- if (ssc == null) throw new Exception("ssc is null")
- if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null")
- ssc.graph.batchDuration
- }
-
- def start()
-
- def stop()
-}
-
-
-/**
- * TODO
- */
-
-class MappedDStream[T: ClassManifest, U: ClassManifest] (
- parent: DStream[T],
- mapFunc: T => U
- ) extends DStream[U](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[U]] = {
- parent.getOrCompute(validTime).map(_.map[U](mapFunc))
- }
-}
-
-
-/**
- * TODO
- */
-
-class FlatMappedDStream[T: ClassManifest, U: ClassManifest](
- parent: DStream[T],
- flatMapFunc: T => Traversable[U]
- ) extends DStream[U](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[U]] = {
- parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
- }
-}
-
-
-/**
- * TODO
- */
-
-class FilteredDStream[T: ClassManifest](
- parent: DStream[T],
- filterFunc: T => Boolean
- ) extends DStream[T](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[T]] = {
- parent.getOrCompute(validTime).map(_.filter(filterFunc))
- }
-}
-
-
-/**
- * TODO
- */
-
-class MapPartitionedDStream[T: ClassManifest, U: ClassManifest](
- parent: DStream[T],
- mapPartFunc: Iterator[T] => Iterator[U]
- ) extends DStream[U](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[U]] = {
- parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc))
- }
-}
-
-
-/**
- * TODO
- */
-
-class GlommedDStream[T: ClassManifest](parent: DStream[T])
- extends DStream[Array[T]](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[Array[T]]] = {
- parent.getOrCompute(validTime).map(_.glom())
- }
-}
-
-
-/**
- * TODO
- */
-
-class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest](
- parent: DStream[(K,V)],
- createCombiner: V => C,
- mergeValue: (C, V) => C,
- mergeCombiner: (C, C) => C,
- partitioner: Partitioner
- ) extends DStream [(K,C)] (parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[(K,C)]] = {
- parent.getOrCompute(validTime) match {
- case Some(rdd) =>
- Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner))
- case None => None
- }
- }
-}
-
-
-/**
- * TODO
- */
-
-class MapValuesDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
- parent: DStream[(K, V)],
- mapValueFunc: V => U
- ) extends DStream[(K, U)](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[(K, U)]] = {
- parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc))
- }
-}
-
-
-/**
- * TODO
- */
-
-class FlatMapValuesDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
- parent: DStream[(K, V)],
- flatMapValueFunc: V => TraversableOnce[U]
- ) extends DStream[(K, U)](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[(K, U)]] = {
- parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc))
- }
-}
-
-
-
-/**
- * TODO
- */
-
-class UnionDStream[T: ClassManifest](parents: Array[DStream[T]])
- extends DStream[T](parents.head.ssc) {
-
- if (parents.length == 0) {
- throw new IllegalArgumentException("Empty array of parents")
- }
-
- if (parents.map(_.ssc).distinct.size > 1) {
- throw new IllegalArgumentException("Array of parents have different StreamingContexts")
- }
-
- if (parents.map(_.slideTime).distinct.size > 1) {
- throw new IllegalArgumentException("Array of parents have different slide times")
- }
-
- override def dependencies = parents.toList
-
- override def slideTime: Time = parents.head.slideTime
-
- override def compute(validTime: Time): Option[RDD[T]] = {
- val rdds = new ArrayBuffer[RDD[T]]()
- parents.map(_.getOrCompute(validTime)).foreach(_ match {
- case Some(rdd) => rdds += rdd
- case None => throw new Exception("Could not generate RDD from a parent for unifying at time " + validTime)
- })
- if (rdds.size > 0) {
- Some(new UnionRDD(ssc.sc, rdds))
- } else {
- None
- }
- }
-}
-
-
-/**
- * TODO
- */
-
-class PerElementForEachDStream[T: ClassManifest] (
- parent: DStream[T],
- foreachFunc: T => Unit
- ) extends DStream[Unit](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[Unit]] = None
-
- override def generateJob(time: Time): Option[Job] = {
- parent.getOrCompute(time) match {
- case Some(rdd) =>
- val jobFunc = () => {
- val sparkJobFunc = {
- (iterator: Iterator[T]) => iterator.foreach(foreachFunc)
- }
- ssc.sc.runJob(rdd, sparkJobFunc)
- }
- Some(new Job(time, jobFunc))
- case None => None
- }
- }
-}
-
-
-/**
- * TODO
- */
-
-class PerRDDForEachDStream[T: ClassManifest] (
- parent: DStream[T],
- foreachFunc: (RDD[T], Time) => Unit
- ) extends DStream[Unit](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[Unit]] = None
-
- override def generateJob(time: Time): Option[Job] = {
- parent.getOrCompute(time) match {
- case Some(rdd) =>
- val jobFunc = () => {
- foreachFunc(rdd, time)
- }
- Some(new Job(time, jobFunc))
- case None => None
- }
- }
-}
-
-
-/**
- * TODO
- */
-
-class TransformedDStream[T: ClassManifest, U: ClassManifest] (
- parent: DStream[T],
- transformFunc: (RDD[T], Time) => RDD[U]
- ) extends DStream[U](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[U]] = {
- parent.getOrCompute(validTime).map(transformFunc(_, validTime))
- }
-}
diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
index d0a9ade61d..c72429370e 100644
--- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
@@ -1,5 +1,6 @@
package spark.streaming
+import dstream.InputDStream
import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
import collection.mutable.ArrayBuffer
import spark.Logging
diff --git a/streaming/src/main/scala/spark/streaming/Interval.scala b/streaming/src/main/scala/spark/streaming/Interval.scala
index ffb7725ac9..fa0b7ce19d 100644
--- a/streaming/src/main/scala/spark/streaming/Interval.scala
+++ b/streaming/src/main/scala/spark/streaming/Interval.scala
@@ -1,5 +1,6 @@
package spark.streaming
+private[streaming]
case class Interval(beginTime: Time, endTime: Time) {
def this(beginMs: Long, endMs: Long) = this(Time(beginMs), new Time(endMs))
diff --git a/streaming/src/main/scala/spark/streaming/Job.scala b/streaming/src/main/scala/spark/streaming/Job.scala
index 0bcb6fd8dc..67bd8388bc 100644
--- a/streaming/src/main/scala/spark/streaming/Job.scala
+++ b/streaming/src/main/scala/spark/streaming/Job.scala
@@ -2,6 +2,7 @@ package spark.streaming
import java.util.concurrent.atomic.AtomicLong
+private[streaming]
class Job(val time: Time, func: () => _) {
val id = Job.getNewId()
def run(): Long = {
@@ -14,6 +15,7 @@ class Job(val time: Time, func: () => _) {
override def toString = "streaming job " + id + " @ " + time
}
+private[streaming]
object Job {
val id = new AtomicLong(0)
diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala
index 9bf9251519..3b910538e0 100644
--- a/streaming/src/main/scala/spark/streaming/JobManager.scala
+++ b/streaming/src/main/scala/spark/streaming/JobManager.scala
@@ -5,6 +5,7 @@ import spark.SparkEnv
import java.util.concurrent.Executors
+private[streaming]
class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging {
class JobHandler(ssc: StreamingContext, job: Job) extends Runnable {
@@ -13,7 +14,7 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging {
try {
val timeTaken = job.run()
logInfo("Total delay: %.5f s for job %s (execution: %.5f s)".format(
- (System.currentTimeMillis() - job.time) / 1000.0, job.id, timeTaken / 1000.0))
+ (System.currentTimeMillis() - job.time.milliseconds) / 1000.0, job.id, timeTaken / 1000.0))
} catch {
case e: Exception =>
logError("Running " + job + " failed", e)
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
index b421f795ee..a6ab44271f 100644
--- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
@@ -1,5 +1,7 @@
package spark.streaming
+import spark.streaming.dstream.{NetworkInputDStream, NetworkReceiver}
+import spark.streaming.dstream.{StopReceiver, ReportBlock, ReportError}
import spark.Logging
import spark.SparkEnv
@@ -11,10 +13,10 @@ import akka.pattern.ask
import akka.util.duration._
import akka.dispatch._
-trait NetworkInputTrackerMessage
-case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage
-case class AddBlocks(streamId: Int, blockIds: Seq[String], metadata: Any) extends NetworkInputTrackerMessage
-case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage
+private[streaming] sealed trait NetworkInputTrackerMessage
+private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage
+private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[String], metadata: Any) extends NetworkInputTrackerMessage
+private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage
class NetworkInputTracker(
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index 720e63bba0..b0a208e67f 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -1,6 +1,9 @@
package spark.streaming
import spark.streaming.StreamingContext._
+import spark.streaming.dstream.{ReducedWindowedDStream, StateDStream}
+import spark.streaming.dstream.{CoGroupedDStream, ShuffledDStream}
+import spark.streaming.dstream.{MapValuedDStream, FlatMapValuedDStream}
import spark.{Manifests, RDD, Partitioner, HashPartitioner}
import spark.SparkContext._
@@ -218,13 +221,13 @@ extends Serializable {
def mapValues[U: ClassManifest](mapValuesFunc: V => U): DStream[(K, U)] = {
- new MapValuesDStream[K, V, U](self, mapValuesFunc)
+ new MapValuedDStream[K, V, U](self, mapValuesFunc)
}
def flatMapValues[U: ClassManifest](
flatMapValuesFunc: V => TraversableOnce[U]
): DStream[(K, U)] = {
- new FlatMapValuesDStream[K, V, U](self, flatMapValuesFunc)
+ new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc)
}
def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
@@ -281,7 +284,7 @@ extends Serializable {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
}
- self.foreachRDD(saveFunc)
+ self.foreach(saveFunc)
}
def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](
@@ -303,7 +306,7 @@ extends Serializable {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
}
- self.foreachRDD(saveFunc)
+ self.foreach(saveFunc)
}
private def getKeyClass() = implicitly[ClassManifest[K]].erasure
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index 014021be61..eb40affe6d 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -4,14 +4,8 @@ import util.{ManualClock, RecurringTimer, Clock}
import spark.SparkEnv
import spark.Logging
-import scala.collection.mutable.HashMap
-
-
-sealed trait SchedulerMessage
-case class InputGenerated(inputName: String, interval: Interval, reference: AnyRef = null) extends SchedulerMessage
-
-class Scheduler(ssc: StreamingContext)
-extends Logging {
+private[streaming]
+class Scheduler(ssc: StreamingContext) extends Logging {
initLogging()
@@ -28,7 +22,7 @@ extends Logging {
val clockClass = System.getProperty("spark.streaming.clock", "spark.streaming.util.SystemClock")
val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
- val timer = new RecurringTimer(clock, ssc.graph.batchDuration, generateRDDs(_))
+ val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, generateRDDs(_))
def start() {
// If context was started from checkpoint, then restart timer such that
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index ce47bcb2da..7256e41af9 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -1,10 +1,10 @@
package spark.streaming
-import spark.RDD
-import spark.Logging
-import spark.SparkEnv
-import spark.SparkContext
+import spark.streaming.dstream._
+
+import spark.{RDD, Logging, SparkEnv, SparkContext}
import spark.storage.StorageLevel
+import spark.util.MetadataCleaner
import scala.collection.mutable.Queue
@@ -15,10 +15,8 @@ import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
-import org.apache.flume.source.avro.AvroFlumeEvent
import org.apache.hadoop.fs.Path
import java.util.UUID
-import spark.util.MetadataCleaner
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -48,7 +46,7 @@ class StreamingContext private (
this(StreamingContext.createNewSparkContext(master, frameworkName), null, batchDuration)
/**
- * Recreates the StreamingContext from a checkpoint file.
+ * Re-creates a StreamingContext from a checkpoint file.
* @param path Path either to the directory that was specified as the checkpoint directory, or
* to the checkpoint file 'graph' or 'graph.bk'.
*/
@@ -61,7 +59,7 @@ class StreamingContext private (
"both SparkContext and checkpoint as null")
}
- val isCheckpointPresent = (cp_ != null)
+ protected[streaming] val isCheckpointPresent = (cp_ != null)
val sc: SparkContext = {
if (isCheckpointPresent) {
@@ -71,9 +69,9 @@ class StreamingContext private (
}
}
- val env = SparkEnv.get
+ protected[streaming] val env = SparkEnv.get
- val graph: DStreamGraph = {
+ protected[streaming] val graph: DStreamGraph = {
if (isCheckpointPresent) {
cp_.graph.setContext(this)
cp_.graph.restoreCheckpointData()
@@ -86,10 +84,10 @@ class StreamingContext private (
}
}
- private[streaming] val nextNetworkInputStreamId = new AtomicInteger(0)
- private[streaming] var networkInputTracker: NetworkInputTracker = null
+ protected[streaming] val nextNetworkInputStreamId = new AtomicInteger(0)
+ protected[streaming] var networkInputTracker: NetworkInputTracker = null
- private[streaming] var checkpointDir: String = {
+ protected[streaming] var checkpointDir: String = {
if (isCheckpointPresent) {
sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(cp_.checkpointDir), true)
cp_.checkpointDir
@@ -98,18 +96,31 @@ class StreamingContext private (
}
}
- private[streaming] var checkpointInterval: Time = if (isCheckpointPresent) cp_.checkpointInterval else null
- private[streaming] var receiverJobThread: Thread = null
- private[streaming] var scheduler: Scheduler = null
+ protected[streaming] var checkpointInterval: Time = if (isCheckpointPresent) cp_.checkpointInterval else null
+ protected[streaming] var receiverJobThread: Thread = null
+ protected[streaming] var scheduler: Scheduler = null
+ /**
+ * Sets each DStreams in this context to remember RDDs it generated in the last given duration.
+ * DStreams remember RDDs only for a limited duration of time and releases them for garbage
+ * collection. This method allows the developer to specify how to long to remember the RDDs (
+ * if the developer wishes to query old data outside the DStream computation).
+ * @param duration Minimum duration that each DStream should remember its RDDs
+ */
def remember(duration: Time) {
graph.remember(duration)
}
- def checkpoint(dir: String, interval: Time = null) {
- if (dir != null) {
- sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(dir))
- checkpointDir = dir
+ /**
+ * Sets the context to periodically checkpoint the DStream operations for master
+ * fault-tolerance. By default, the graph will be checkpointed every batch interval.
+ * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored
+ * @param interval checkpoint interval
+ */
+ def checkpoint(directory: String, interval: Time = null) {
+ if (directory != null) {
+ sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(directory))
+ checkpointDir = directory
checkpointInterval = interval
} else {
checkpointDir = null
@@ -117,16 +128,15 @@ class StreamingContext private (
}
}
- private[streaming] def getInitialCheckpoint(): Checkpoint = {
+ protected[streaming] def getInitialCheckpoint(): Checkpoint = {
if (isCheckpointPresent) cp_ else null
}
- private[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement()
+ protected[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement()
- /**
+ /**
* Create an input stream that pulls messages form a Kafka Broker.
- *
- * @param host Zookeper hostname.
+ * @param hostname Zookeper hostname.
* @param port Zookeper port.
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
@@ -148,6 +158,15 @@ class StreamingContext private (
inputStream
}
+ /**
+ * Create a input stream from network source hostname:port. Data is received using
+ * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
+ * lines.
+ * @param hostname Hostname to connect to for receiving data
+ * @param port Port to connect to for receiving data
+ * @param storageLevel Storage level to use for storing the received objects
+ * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
+ */
def networkTextStream(
hostname: String,
port: Int,
@@ -156,6 +175,16 @@ class StreamingContext private (
networkStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
+ /**
+ * Create a input stream from network source hostname:port. Data is received using
+ * a TCP socket and the receive bytes it interepreted as object using the given
+ * converter.
+ * @param hostname Hostname to connect to for receiving data
+ * @param port Port to connect to for receiving data
+ * @param converter Function to convert the byte stream to objects
+ * @param storageLevel Storage level to use for storing the received objects
+ * @tparam T Type of the objects received (after converting bytes to objects)
+ */
def networkStream[T: ClassManifest](
hostname: String,
port: Int,
@@ -167,16 +196,32 @@ class StreamingContext private (
inputStream
}
+ /**
+ * Creates a input stream from a Flume source.
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ * @param storageLevel Storage level to use for storing the received objects
+ */
def flumeStream (
- hostname: String,
- port: Int,
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): DStream[SparkFlumeEvent] = {
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): DStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream(this, hostname, port, storageLevel)
graph.addInputStream(inputStream)
inputStream
}
-
+ /**
+ * Create a input stream from network source hostname:port, where data is received
+ * as serialized blocks (serialized using the Spark's serializer) that can be directly
+ * pushed into the block manager without deserializing them. This is the most efficient
+ * way to receive data.
+ * @param hostname Hostname to connect to for receiving data
+ * @param port Port to connect to for receiving data
+ * @param storageLevel Storage level to use for storing the received objects
+ * @tparam T Type of the objects in the received blocks
+ */
def rawNetworkStream[T: ClassManifest](
hostname: String,
port: Int,
@@ -188,8 +233,12 @@ class StreamingContext private (
}
/**
- * This function creates a input stream that monitors a Hadoop-compatible filesystem
- * for new files and executes the necessary processing on them.
+ * Creates a input stream that monitors a Hadoop-compatible filesystem
+ * for new files and reads them using the given key-value types and input format.
+ * @param directory HDFS directory to monitor for new file
+ * @tparam K Key type for reading HDFS file
+ * @tparam V Value type for reading HDFS file
+ * @tparam F Input format for reading HDFS file
*/
def fileStream[
K: ClassManifest,
@@ -201,13 +250,23 @@ class StreamingContext private (
inputStream
}
+ /**
+ * Creates a input stream that monitors a Hadoop-compatible filesystem
+ * for new files and reads them as text files (using key as LongWritable, value
+ * as Text and input format as TextInputFormat).
+ * @param directory HDFS directory to monitor for new file
+ */
def textFileStream(directory: String): DStream[String] = {
fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
}
/**
- * This function create a input stream from an queue of RDDs. In each batch,
- * it will process either one or all of the RDDs returned by the queue
+ * Creates a input stream from an queue of RDDs. In each batch,
+ * it will process either one or all of the RDDs returned by the queue.
+ * @param queue Queue of RDDs
+ * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
+ * @param defaultRDD Default RDD is returned by the DStream when the queue is empty
+ * @tparam T Type of objects in the RDD
*/
def queueStream[T: ClassManifest](
queue: Queue[RDD[T]],
@@ -219,34 +278,29 @@ class StreamingContext private (
inputStream
}
- def queueStream[T: ClassManifest](array: Array[RDD[T]]): DStream[T] = {
- val queue = new Queue[RDD[T]]
- val inputStream = queueStream(queue, true, null)
- queue ++= array
- inputStream
- }
-
+ /**
+ * Create a unified DStream from multiple DStreams of the same type and same interval
+ */
def union[T: ClassManifest](streams: Seq[DStream[T]]): DStream[T] = {
new UnionDStream[T](streams.toArray)
}
/**
- * This function registers a InputDStream as an input stream that will be
- * started (InputDStream.start() called) to get the input data streams.
+ * Registers an input stream that will be started (InputDStream.start() called) to get the
+ * input data.
*/
def registerInputStream(inputStream: InputDStream[_]) {
graph.addInputStream(inputStream)
}
/**
- * This function registers a DStream as an output stream that will be
- * computed every interval.
+ * Registers an output stream that will be computed every interval
*/
def registerOutputStream(outputStream: DStream[_]) {
graph.addOutputStream(outputStream)
}
- def validate() {
+ protected def validate() {
assert(graph != null, "Graph is null")
graph.validate()
@@ -258,7 +312,7 @@ class StreamingContext private (
}
/**
- * This function starts the execution of the streams.
+ * Starts the execution of the streams.
*/
def start() {
if (checkpointDir != null && checkpointInterval == null && graph != null) {
@@ -286,7 +340,7 @@ class StreamingContext private (
}
/**
- * This function stops the execution of the streams.
+ * Sstops the execution of the streams.
*/
def stop() {
try {
@@ -304,7 +358,11 @@ class StreamingContext private (
object StreamingContext {
- def createNewSparkContext(master: String, frameworkName: String): SparkContext = {
+ implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) = {
+ new PairDStreamFunctions[K, V](stream)
+ }
+
+ protected[streaming] def createNewSparkContext(master: String, frameworkName: String): SparkContext = {
// Set the default cleaner delay to an hour if not already set.
// This should be sufficient for even 1 second interval.
@@ -314,13 +372,9 @@ object StreamingContext {
new SparkContext(master, frameworkName)
}
- implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) = {
- new PairDStreamFunctions[K, V](stream)
- }
-
- def rddToFileName[T](prefix: String, suffix: String, time: Time): String = {
+ protected[streaming] def rddToFileName[T](prefix: String, suffix: String, time: Time): String = {
if (prefix == null) {
- time.millis.toString
+ time.milliseconds.toString
} else if (suffix == null || suffix.length ==0) {
prefix + "-" + time.milliseconds
} else {
@@ -328,7 +382,7 @@ object StreamingContext {
}
}
- def getSparkCheckpointDir(sscCheckpointDir: String): String = {
+ protected[streaming] def getSparkCheckpointDir(sscCheckpointDir: String): String = {
new Path(sscCheckpointDir, UUID.randomUUID.toString).toString
}
}
diff --git a/streaming/src/main/scala/spark/streaming/Time.scala b/streaming/src/main/scala/spark/streaming/Time.scala
index 480d292d7c..3c6fd5d967 100644
--- a/streaming/src/main/scala/spark/streaming/Time.scala
+++ b/streaming/src/main/scala/spark/streaming/Time.scala
@@ -1,11 +1,18 @@
package spark.streaming
-case class Time(millis: Long) {
+/**
+ * This is a simple class that represents time. Internally, it represents time as UTC.
+ * The recommended way to create instances of Time is to use helper objects
+ * [[spark.streaming.Milliseconds]], [[spark.streaming.Seconds]], and [[spark.streaming.Minutes]].
+ * @param millis Time in UTC.
+ */
+
+case class Time(private val millis: Long) {
def < (that: Time): Boolean = (this.millis < that.millis)
-
+
def <= (that: Time): Boolean = (this.millis <= that.millis)
-
+
def > (that: Time): Boolean = (this.millis > that.millis)
def >= (that: Time): Boolean = (this.millis >= that.millis)
@@ -15,7 +22,9 @@ case class Time(millis: Long) {
def - (that: Time): Time = Time(millis - that.millis)
def * (times: Int): Time = Time(millis * times)
-
+
+ def / (that: Time): Long = millis / that.millis
+
def floor(that: Time): Time = {
val t = that.millis
val m = math.floor(this.millis / t).toLong
@@ -38,23 +47,33 @@ case class Time(millis: Long) {
def milliseconds: Long = millis
}
-object Time {
+private[streaming] object Time {
val zero = Time(0)
implicit def toTime(long: Long) = Time(long)
-
- implicit def toLong(time: Time) = time.milliseconds
}
+/**
+ * Helper object that creates instance of [[spark.streaming.Time]] representing
+ * a given number of milliseconds.
+ */
object Milliseconds {
def apply(milliseconds: Long) = Time(milliseconds)
}
+/**
+ * Helper object that creates instance of [[spark.streaming.Time]] representing
+ * a given number of seconds.
+ */
object Seconds {
def apply(seconds: Long) = Time(seconds * 1000)
-}
+}
-object Minutes {
+/**
+ * Helper object that creates instance of [[spark.streaming.Time]] representing
+ * a given number of minutes.
+ */
+object Minutes {
def apply(minutes: Long) = Time(minutes * 60000)
}
diff --git a/streaming/src/main/scala/spark/streaming/CoGroupedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala
index 61d088eddb..bc23d423d3 100644
--- a/streaming/src/main/scala/spark/streaming/CoGroupedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala
@@ -1,8 +1,10 @@
-package spark.streaming
+package spark.streaming.dstream
import spark.{RDD, Partitioner}
import spark.rdd.CoGroupedRDD
+import spark.streaming.{Time, DStream}
+private[streaming]
class CoGroupedDStream[K : ClassManifest](
parents: Seq[DStream[(_, _)]],
partitioner: Partitioner
diff --git a/streaming/src/main/scala/spark/streaming/ConstantInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ConstantInputDStream.scala
index 80150708fd..41c3af4694 100644
--- a/streaming/src/main/scala/spark/streaming/ConstantInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/ConstantInputDStream.scala
@@ -1,6 +1,7 @@
-package spark.streaming
+package spark.streaming.dstream
import spark.RDD
+import spark.streaming.{Time, StreamingContext}
/**
* An input stream that always returns the same RDD on each timestep. Useful for testing.
diff --git a/streaming/src/main/scala/spark/streaming/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
index 88856364d2..cf72095324 100644
--- a/streaming/src/main/scala/spark/streaming/FileInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
@@ -1,7 +1,8 @@
-package spark.streaming
+package spark.streaming.dstream
import spark.RDD
import spark.rdd.UnionRDD
+import spark.streaming.{StreamingContext, Time}
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.conf.Configuration
@@ -9,7 +10,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import scala.collection.mutable.HashSet
-
+private[streaming]
class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest](
@transient ssc_ : StreamingContext,
directory: String,
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala
new file mode 100644
index 0000000000..1cbb4d536e
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala
@@ -0,0 +1,21 @@
+package spark.streaming.dstream
+
+import spark.streaming.{DStream, Time}
+import spark.RDD
+
+private[streaming]
+class FilteredDStream[T: ClassManifest](
+ parent: DStream[T],
+ filterFunc: T => Boolean
+ ) extends DStream[T](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[T]] = {
+ parent.getOrCompute(validTime).map(_.filter(filterFunc))
+ }
+}
+
+
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala
new file mode 100644
index 0000000000..11ed8cf317
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala
@@ -0,0 +1,20 @@
+package spark.streaming.dstream
+
+import spark.streaming.{DStream, Time}
+import spark.RDD
+import spark.SparkContext._
+
+private[streaming]
+class FlatMapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
+ parent: DStream[(K, V)],
+ flatMapValueFunc: V => TraversableOnce[U]
+ ) extends DStream[(K, U)](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[(K, U)]] = {
+ parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc))
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala
new file mode 100644
index 0000000000..a13b4c9ff9
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala
@@ -0,0 +1,20 @@
+package spark.streaming.dstream
+
+import spark.streaming.{DStream, Time}
+import spark.RDD
+
+private[streaming]
+class FlatMappedDStream[T: ClassManifest, U: ClassManifest](
+ parent: DStream[T],
+ flatMapFunc: T => Traversable[U]
+ ) extends DStream[U](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[U]] = {
+ parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
+ }
+}
+
diff --git a/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala
index 02d9811669..a6fa378d6e 100644
--- a/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala
@@ -1,17 +1,23 @@
-package spark.streaming
+package spark.streaming.dstream
-import java.io.{ObjectInput, ObjectOutput, Externalizable}
+import spark.streaming.StreamingContext
+
+import spark.Utils
import spark.storage.StorageLevel
+
import org.apache.flume.source.avro.AvroSourceProtocol
import org.apache.flume.source.avro.AvroFlumeEvent
import org.apache.flume.source.avro.Status
import org.apache.avro.ipc.specific.SpecificResponder
import org.apache.avro.ipc.NettyServer
+
+import scala.collection.JavaConversions._
+
import java.net.InetSocketAddress
-import collection.JavaConversions._
-import spark.Utils
+import java.io.{ObjectInput, ObjectOutput, Externalizable}
import java.nio.ByteBuffer
+private[streaming]
class FlumeInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
host: String,
@@ -79,7 +85,7 @@ class SparkFlumeEvent() extends Externalizable {
}
}
-object SparkFlumeEvent {
+private[streaming] object SparkFlumeEvent {
def fromAvroFlumeEvent(in : AvroFlumeEvent) : SparkFlumeEvent = {
val event = new SparkFlumeEvent
event.event = in
@@ -88,6 +94,7 @@ object SparkFlumeEvent {
}
/** A simple server that implements Flume's Avro protocol. */
+private[streaming]
class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
override def append(event : AvroFlumeEvent) : Status = {
receiver.dataHandler += SparkFlumeEvent.fromAvroFlumeEvent(event)
@@ -103,12 +110,13 @@ class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
/** A NetworkReceiver which listens for events using the
* Flume Avro interface.*/
+private[streaming]
class FlumeReceiver(
- streamId: Int,
- host: String,
- port: Int,
- storageLevel: StorageLevel
- ) extends NetworkReceiver[SparkFlumeEvent](streamId) {
+ streamId: Int,
+ host: String,
+ port: Int,
+ storageLevel: StorageLevel
+ ) extends NetworkReceiver[SparkFlumeEvent](streamId) {
lazy val dataHandler = new BufferingBlockCreator(this, storageLevel)
diff --git a/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala
new file mode 100644
index 0000000000..41c629a225
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala
@@ -0,0 +1,28 @@
+package spark.streaming.dstream
+
+import spark.RDD
+import spark.streaming.{DStream, Job, Time}
+
+private[streaming]
+class ForEachDStream[T: ClassManifest] (
+ parent: DStream[T],
+ foreachFunc: (RDD[T], Time) => Unit
+ ) extends DStream[Unit](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[Unit]] = None
+
+ override def generateJob(time: Time): Option[Job] = {
+ parent.getOrCompute(time) match {
+ case Some(rdd) =>
+ val jobFunc = () => {
+ foreachFunc(rdd, time)
+ }
+ Some(new Job(time, jobFunc))
+ case None => None
+ }
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala
new file mode 100644
index 0000000000..92ea503cae
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala
@@ -0,0 +1,17 @@
+package spark.streaming.dstream
+
+import spark.streaming.{DStream, Time}
+import spark.RDD
+
+private[streaming]
+class GlommedDStream[T: ClassManifest](parent: DStream[T])
+ extends DStream[Array[T]](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[Array[T]]] = {
+ parent.getOrCompute(validTime).map(_.glom())
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala
new file mode 100644
index 0000000000..4959c66b06
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala
@@ -0,0 +1,19 @@
+package spark.streaming.dstream
+
+import spark.streaming.{StreamingContext, DStream}
+
+abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext)
+ extends DStream[T](ssc_) {
+
+ override def dependencies = List()
+
+ override def slideTime = {
+ if (ssc == null) throw new Exception("ssc is null")
+ if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null")
+ ssc.graph.batchDuration
+ }
+
+ def start()
+
+ def stop()
+}
diff --git a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
index 66f60519bc..b1941fb427 100644
--- a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
@@ -1,30 +1,36 @@
-package spark.streaming
+package spark.streaming.dstream
+
+import spark.Logging
+import spark.storage.StorageLevel
+import spark.streaming.{Time, DStreamCheckpointData, StreamingContext}
import java.util.Properties
import java.util.concurrent.Executors
+
import kafka.consumer._
import kafka.message.{Message, MessageSet, MessageAndMetadata}
import kafka.serializer.StringDecoder
import kafka.utils.{Utils, ZKGroupTopicDirs}
import kafka.utils.ZkUtils._
+
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
-import spark._
-import spark.RDD
-import spark.storage.StorageLevel
+
// Key for a specific Kafka Partition: (broker, topic, group, part)
case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int)
// NOT USED - Originally intended for fault-tolerance
// Metadata for a Kafka Stream that it sent to the Master
+private[streaming]
case class KafkaInputDStreamMetadata(timestamp: Long, data: Map[KafkaPartitionKey, Long])
// NOT USED - Originally intended for fault-tolerance
// Checkpoint data specific to a KafkaInputDstream
-case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any],
+private[streaming]
+case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any],
savedOffsets: Map[KafkaPartitionKey, Long]) extends DStreamCheckpointData(kafkaRdds)
/**
- * Input stream that pulls messages form a Kafka Broker.
+ * Input stream that pulls messages from a Kafka Broker.
*
* @param host Zookeper hostname.
* @param port Zookeper port.
@@ -35,6 +41,7 @@ case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any],
* By default the value is pulled from zookeper.
* @param storageLevel RDD storage level.
*/
+private[streaming]
class KafkaInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
host: String,
@@ -94,6 +101,7 @@ class KafkaInputDStream[T: ClassManifest](
}
}
+private[streaming]
class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String,
topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long],
storageLevel: StorageLevel) extends NetworkReceiver[Any](streamId) {
diff --git a/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala
new file mode 100644
index 0000000000..daf78c6893
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala
@@ -0,0 +1,21 @@
+package spark.streaming.dstream
+
+import spark.streaming.{DStream, Time}
+import spark.RDD
+
+private[streaming]
+class MapPartitionedDStream[T: ClassManifest, U: ClassManifest](
+ parent: DStream[T],
+ mapPartFunc: Iterator[T] => Iterator[U],
+ preservePartitioning: Boolean
+ ) extends DStream[U](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[U]] = {
+ parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning))
+ }
+}
+
diff --git a/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala
new file mode 100644
index 0000000000..689caeef0e
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala
@@ -0,0 +1,21 @@
+package spark.streaming.dstream
+
+import spark.streaming.{DStream, Time}
+import spark.RDD
+import spark.SparkContext._
+
+private[streaming]
+class MapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
+ parent: DStream[(K, V)],
+ mapValueFunc: V => U
+ ) extends DStream[(K, U)](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[(K, U)]] = {
+ parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc))
+ }
+}
+
diff --git a/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala
new file mode 100644
index 0000000000..786b9966f2
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala
@@ -0,0 +1,20 @@
+package spark.streaming.dstream
+
+import spark.streaming.{DStream, Time}
+import spark.RDD
+
+private[streaming]
+class MappedDStream[T: ClassManifest, U: ClassManifest] (
+ parent: DStream[T],
+ mapFunc: T => U
+ ) extends DStream[U](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[U]] = {
+ parent.getOrCompute(validTime).map(_.map[U](mapFunc))
+ }
+}
+
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
index 4e4e9fc942..41276da8bb 100644
--- a/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
@@ -1,12 +1,13 @@
-package spark.streaming
+package spark.streaming.dstream
-import scala.collection.mutable.ArrayBuffer
+import spark.streaming.{Time, StreamingContext, AddBlocks, RegisterReceiver, DeregisterReceiver}
import spark.{Logging, SparkEnv, RDD}
import spark.rdd.BlockRDD
-import spark.streaming.util.{RecurringTimer, SystemClock}
import spark.storage.StorageLevel
+import scala.collection.mutable.ArrayBuffer
+
import java.nio.ByteBuffer
import akka.actor.{Props, Actor}
@@ -40,10 +41,10 @@ abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : Streaming
}
-sealed trait NetworkReceiverMessage
-case class StopReceiver(msg: String) extends NetworkReceiverMessage
-case class ReportBlock(blockId: String, metadata: Any) extends NetworkReceiverMessage
-case class ReportError(msg: String) extends NetworkReceiverMessage
+private[streaming] sealed trait NetworkReceiverMessage
+private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage
+private[streaming] case class ReportBlock(blockId: String, metadata: Any) extends NetworkReceiverMessage
+private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage
abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Serializable with Logging {
diff --git a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala
index bb86e51932..024bf3bea4 100644
--- a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala
@@ -1,10 +1,11 @@
-package spark.streaming
+package spark.streaming.dstream
import spark.RDD
import spark.rdd.UnionRDD
import scala.collection.mutable.Queue
import scala.collection.mutable.ArrayBuffer
+import spark.streaming.{Time, StreamingContext}
class QueueInputDStream[T: ClassManifest](
@transient ssc: StreamingContext,
diff --git a/streaming/src/main/scala/spark/streaming/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
index 6acaa9aab1..aa2f31cea8 100644
--- a/streaming/src/main/scala/spark/streaming/RawInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
@@ -1,12 +1,15 @@
-package spark.streaming
+package spark.streaming.dstream
+
+import spark.{DaemonThread, Logging}
+import spark.storage.StorageLevel
+import spark.streaming.StreamingContext
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.{ReadableByteChannel, SocketChannel}
import java.io.EOFException
import java.util.concurrent.ArrayBlockingQueue
-import spark._
-import spark.storage.StorageLevel
+
/**
* An input stream that reads blocks of serialized objects from a given network address.
@@ -14,6 +17,7 @@ import spark.storage.StorageLevel
* data into Spark Streaming, though it requires the sender to batch data and serialize it
* in the format that the system is configured with.
*/
+private[streaming]
class RawInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
host: String,
@@ -26,6 +30,7 @@ class RawInputDStream[T: ClassManifest](
}
}
+private[streaming]
class RawNetworkReceiver(streamId: Int, host: String, port: Int, storageLevel: StorageLevel)
extends NetworkReceiver[Any](streamId) {
diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
index f63a9e0011..d289ed2a3f 100644
--- a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -1,17 +1,17 @@
-package spark.streaming
+package spark.streaming.dstream
import spark.streaming.StreamingContext._
import spark.RDD
-import spark.rdd.UnionRDD
import spark.rdd.CoGroupedRDD
import spark.Partitioner
import spark.SparkContext._
import spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
-import collection.SeqProxy
+import spark.streaming.{Interval, Time, DStream}
+private[streaming]
class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
parent: DStream[(K, V)],
reduceFunc: (V, V) => V,
diff --git a/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala
new file mode 100644
index 0000000000..6854bbe665
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala
@@ -0,0 +1,27 @@
+package spark.streaming.dstream
+
+import spark.{RDD, Partitioner}
+import spark.SparkContext._
+import spark.streaming.{DStream, Time}
+
+private[streaming]
+class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest](
+ parent: DStream[(K,V)],
+ createCombiner: V => C,
+ mergeValue: (C, V) => C,
+ mergeCombiner: (C, C) => C,
+ partitioner: Partitioner
+ ) extends DStream [(K,C)] (parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[(K,C)]] = {
+ parent.getOrCompute(validTime) match {
+ case Some(rdd) =>
+ Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner))
+ case None => None
+ }
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
index f7a34d2515..8374f131d6 100644
--- a/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
@@ -1,15 +1,12 @@
-package spark.streaming
+package spark.streaming.dstream
-import spark.streaming.util.{RecurringTimer, SystemClock}
+import spark.streaming.StreamingContext
import spark.storage.StorageLevel
import java.io._
import java.net.Socket
-import java.util.concurrent.ArrayBlockingQueue
-
-import scala.collection.mutable.ArrayBuffer
-import scala.Serializable
+private[streaming]
class SocketInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
host: String,
@@ -23,7 +20,7 @@ class SocketInputDStream[T: ClassManifest](
}
}
-
+private[streaming]
class SocketReceiver[T: ClassManifest](
streamId: Int,
host: String,
@@ -54,7 +51,7 @@ class SocketReceiver[T: ClassManifest](
}
-
+private[streaming]
object SocketReceiver {
/**
diff --git a/streaming/src/main/scala/spark/streaming/StateDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
index b7e4c1c30c..175b3060c1 100644
--- a/streaming/src/main/scala/spark/streaming/StateDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
@@ -1,12 +1,12 @@
-package spark.streaming
+package spark.streaming.dstream
import spark.RDD
-import spark.rdd.BlockRDD
import spark.Partitioner
-import spark.rdd.MapPartitionsRDD
import spark.SparkContext._
import spark.storage.StorageLevel
+import spark.streaming.{Time, DStream}
+private[streaming]
class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest](
parent: DStream[(K, V)],
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
diff --git a/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala
new file mode 100644
index 0000000000..0337579514
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala
@@ -0,0 +1,19 @@
+package spark.streaming.dstream
+
+import spark.RDD
+import spark.streaming.{DStream, Time}
+
+private[streaming]
+class TransformedDStream[T: ClassManifest, U: ClassManifest] (
+ parent: DStream[T],
+ transformFunc: (RDD[T], Time) => RDD[U]
+ ) extends DStream[U](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[U]] = {
+ parent.getOrCompute(validTime).map(transformFunc(_, validTime))
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala
new file mode 100644
index 0000000000..3bf4c2ecea
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala
@@ -0,0 +1,40 @@
+package spark.streaming.dstream
+
+import spark.streaming.{DStream, Time}
+import spark.RDD
+import collection.mutable.ArrayBuffer
+import spark.rdd.UnionRDD
+
+private[streaming]
+class UnionDStream[T: ClassManifest](parents: Array[DStream[T]])
+ extends DStream[T](parents.head.ssc) {
+
+ if (parents.length == 0) {
+ throw new IllegalArgumentException("Empty array of parents")
+ }
+
+ if (parents.map(_.ssc).distinct.size > 1) {
+ throw new IllegalArgumentException("Array of parents have different StreamingContexts")
+ }
+
+ if (parents.map(_.slideTime).distinct.size > 1) {
+ throw new IllegalArgumentException("Array of parents have different slide times")
+ }
+
+ override def dependencies = parents.toList
+
+ override def slideTime: Time = parents.head.slideTime
+
+ override def compute(validTime: Time): Option[RDD[T]] = {
+ val rdds = new ArrayBuffer[RDD[T]]()
+ parents.map(_.getOrCompute(validTime)).foreach(_ match {
+ case Some(rdd) => rdds += rdd
+ case None => throw new Exception("Could not generate RDD from a parent for unifying at time " + validTime)
+ })
+ if (rdds.size > 0) {
+ Some(new UnionRDD(ssc.sc, rdds))
+ } else {
+ None
+ }
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala
index e4d2a634f5..7718794cbf 100644
--- a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala
@@ -1,10 +1,11 @@
-package spark.streaming
+package spark.streaming.dstream
import spark.RDD
import spark.rdd.UnionRDD
import spark.storage.StorageLevel
+import spark.streaming.{Interval, Time, DStream}
-
+private[streaming]
class WindowedDStream[T: ClassManifest](
parent: DStream[T],
_windowTime: Time,
diff --git a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
index 6cb2b4c042..dfaaf03f03 100644
--- a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
@@ -25,8 +25,8 @@ object GrepRaw {
val rawStreams = (1 to numStreams).map(_ =>
ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray
- val union = new UnionDStream(rawStreams)
- union.filter(_.contains("Alice")).count().foreachRDD(r =>
+ val union = ssc.union(rawStreams)
+ union.filter(_.contains("Alice")).count().foreach(r =>
println("Grep count: " + r.collect().mkString))
ssc.start()
}
diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
index fe4c2bf155..338834bc3c 100644
--- a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
@@ -34,11 +34,11 @@ object TopKWordCountRaw {
val lines = (1 to numStreams).map(_ => {
ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2)
})
- val union = new UnionDStream(lines.toArray)
+ val union = ssc.union(lines)
val counts = union.mapPartitions(splitAndCountPartitions)
val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10)
val partialTopKWindowedCounts = windowedCounts.mapPartitions(topK(_, k))
- partialTopKWindowedCounts.foreachRDD(rdd => {
+ partialTopKWindowedCounts.foreach(rdd => {
val collectedCounts = rdd.collect
println("Collected " + collectedCounts.size + " words from partial top words")
println("Top " + k + " words are " + topK(collectedCounts.toIterator, k).mkString(","))
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
index a29c81d437..d93335a8ce 100644
--- a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
@@ -33,10 +33,10 @@ object WordCountRaw {
val lines = (1 to numStreams).map(_ => {
ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2)
})
- val union = new UnionDStream(lines.toArray)
+ val union = ssc.union(lines)
val counts = union.mapPartitions(splitAndCountPartitions)
val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10)
- windowedCounts.foreachRDD(r => println("# unique words = " + r.count()))
+ windowedCounts.foreach(r => println("# unique words = " + r.count()))
ssc.start()
}
diff --git a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
index 68be6b7893..a191321d91 100644
--- a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
@@ -72,7 +72,7 @@ object PageViewStream {
case "popularUsersSeen" =>
// Look for users in our existing dataset and print it out if we have a match
pageViews.map(view => (view.userID, 1))
- .foreachRDD((rdd, time) => rdd.join(userList)
+ .foreach((rdd, time) => rdd.join(userList)
.map(_._2._2)
.take(10)
.foreach(u => println("Saw user %s at time %s".format(u, time))))
diff --git a/streaming/src/main/scala/spark/streaming/util/Clock.scala b/streaming/src/main/scala/spark/streaming/util/Clock.scala
index ed087e4ea8..974651f9f6 100644
--- a/streaming/src/main/scala/spark/streaming/util/Clock.scala
+++ b/streaming/src/main/scala/spark/streaming/util/Clock.scala
@@ -1,13 +1,12 @@
package spark.streaming.util
-import spark.streaming._
-
-trait Clock {
+private[streaming]
+trait Clock {
def currentTime(): Long
def waitTillTime(targetTime: Long): Long
}
-
+private[streaming]
class SystemClock() extends Clock {
val minPollTime = 25L
@@ -54,6 +53,7 @@ class SystemClock() extends Clock {
}
}
+private[streaming]
class ManualClock() extends Clock {
var time = 0L
diff --git a/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala
index dc55fd902b..db715cc295 100644
--- a/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala
+++ b/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala
@@ -1,5 +1,6 @@
package spark.streaming.util
+private[streaming]
class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => Unit) {
val minPollTime = 25L
@@ -53,6 +54,7 @@ class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) =>
}
}
+private[streaming]
object RecurringTimer {
def main(args: Array[String]) {
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
index 0d82b2f1ea..920388bba9 100644
--- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
@@ -42,7 +42,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
val stateStreamCheckpointInterval = Seconds(1)
// this ensure checkpointing occurs at least once
- val firstNumBatches = (stateStreamCheckpointInterval.millis / batchDuration.millis) * 2
+ val firstNumBatches = (stateStreamCheckpointInterval / batchDuration) * 2
val secondNumBatches = firstNumBatches
// Setup the streams
diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
index 5b414117fc..4aa428bf64 100644
--- a/streaming/src/test/scala/spark/streaming/FailureSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
@@ -133,7 +133,7 @@ class FailureSuite extends TestSuiteBase with BeforeAndAfter {
// Get the output buffer
val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]]
val output = outputStream.output
- val waitTime = (batchDuration.millis * (numBatches.toDouble + 0.5)).toLong
+ val waitTime = (batchDuration.milliseconds * (numBatches.toDouble + 0.5)).toLong
val startTime = System.currentTimeMillis()
try {
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index ed9a659092..76b528bec3 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -1,5 +1,6 @@
package spark.streaming
+import dstream.SparkFlumeEvent
import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket}
import java.io.{File, BufferedWriter, OutputStreamWriter}
import java.util.concurrent.{TimeUnit, ArrayBlockingQueue}
diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
index 8cc2f8ccfc..28bdd53c3c 100644
--- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
@@ -1,12 +1,16 @@
package spark.streaming
+import spark.streaming.dstream.{InputDStream, ForEachDStream}
+import spark.streaming.util.ManualClock
+
import spark.{RDD, Logging}
-import util.ManualClock
+
import collection.mutable.ArrayBuffer
-import org.scalatest.FunSuite
import collection.mutable.SynchronizedBuffer
+
import java.io.{ObjectInputStream, IOException}
+import org.scalatest.FunSuite
/**
* This is a input stream just for the testsuites. This is equivalent to a checkpointable,
@@ -35,7 +39,7 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[
* ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
*/
class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBuffer[Seq[T]])
- extends PerRDDForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
+ extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.collect()
output += collected
}) {
@@ -70,6 +74,10 @@ trait TestSuiteBase extends FunSuite with Logging {
def actuallyWait = false
+ /**
+ * Set up required DStreams to test the DStream operation using the two sequences
+ * of input collections.
+ */
def setupStreams[U: ClassManifest, V: ClassManifest](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V]
@@ -90,6 +98,10 @@ trait TestSuiteBase extends FunSuite with Logging {
ssc
}
+ /**
+ * Set up required DStreams to test the binary operation using the sequence
+ * of input collections.
+ */
def setupStreams[U: ClassManifest, V: ClassManifest, W: ClassManifest](
input1: Seq[Seq[U]],
input2: Seq[Seq[V]],
@@ -173,6 +185,11 @@ trait TestSuiteBase extends FunSuite with Logging {
output
}
+ /**
+ * Verify whether the output values after running a DStream operation
+ * is same as the expected output values, by comparing the output
+ * collections either as lists (order matters) or sets (order does not matter)
+ */
def verifyOutput[V: ClassManifest](
output: Seq[Seq[V]],
expectedOutput: Seq[Seq[V]],
@@ -199,6 +216,10 @@ trait TestSuiteBase extends FunSuite with Logging {
logInfo("Output verified successfully")
}
+ /**
+ * Test unary DStream operation with a list of inputs, with number of
+ * batches to run same as the number of expected output values
+ */
def testOperation[U: ClassManifest, V: ClassManifest](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
@@ -208,6 +229,15 @@ trait TestSuiteBase extends FunSuite with Logging {
testOperation[U, V](input, operation, expectedOutput, -1, useSet)
}
+ /**
+ * Test unary DStream operation with a list of inputs
+ * @param input Sequence of input collections
+ * @param operation Binary DStream operation to be applied to the 2 inputs
+ * @param expectedOutput Sequence of expected output collections
+ * @param numBatches Number of batches to run the operation for
+ * @param useSet Compare the output values with the expected output values
+ * as sets (order matters) or as lists (order does not matter)
+ */
def testOperation[U: ClassManifest, V: ClassManifest](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
@@ -221,6 +251,10 @@ trait TestSuiteBase extends FunSuite with Logging {
verifyOutput[V](output, expectedOutput, useSet)
}
+ /**
+ * Test binary DStream operation with two lists of inputs, with number of
+ * batches to run same as the number of expected output values
+ */
def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest](
input1: Seq[Seq[U]],
input2: Seq[Seq[V]],
@@ -231,6 +265,16 @@ trait TestSuiteBase extends FunSuite with Logging {
testOperation[U, V, W](input1, input2, operation, expectedOutput, -1, useSet)
}
+ /**
+ * Test binary DStream operation with two lists of inputs
+ * @param input1 First sequence of input collections
+ * @param input2 Second sequence of input collections
+ * @param operation Binary DStream operation to be applied to the 2 inputs
+ * @param expectedOutput Sequence of expected output collections
+ * @param numBatches Number of batches to run the operation for
+ * @param useSet Compare the output values with the expected output values
+ * as sets (order matters) or as lists (order does not matter)
+ */
def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest](
input1: Seq[Seq[U]],
input2: Seq[Seq[V]],
diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
index 3e20e16708..4bc5229465 100644
--- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
@@ -209,7 +209,7 @@ class WindowOperationsSuite extends TestSuiteBase {
val expectedOutput = bigGroupByOutput.map(_.map(x => (x._1, x._2.toSet)))
val windowTime = Seconds(2)
val slideTime = Seconds(1)
- val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
+ val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
s.groupByKeyAndWindow(windowTime, slideTime)
.map(x => (x._1, x._2.toSet))
@@ -223,7 +223,7 @@ class WindowOperationsSuite extends TestSuiteBase {
val expectedOutput = Seq( Seq(1), Seq(2), Seq(3), Seq(3), Seq(1), Seq(0))
val windowTime = Seconds(2)
val slideTime = Seconds(1)
- val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
+ val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
val operation = (s: DStream[Int]) => s.countByWindow(windowTime, slideTime)
testOperation(input, operation, expectedOutput, numBatches, true)
}
@@ -233,7 +233,7 @@ class WindowOperationsSuite extends TestSuiteBase {
val expectedOutput = Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 2)), Seq(("a", 1), ("b", 3)))
val windowTime = Seconds(2)
val slideTime = Seconds(1)
- val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
+ val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
s.countByKeyAndWindow(windowTime, slideTime).map(x => (x._1, x._2.toInt))
}
@@ -251,7 +251,7 @@ class WindowOperationsSuite extends TestSuiteBase {
slideTime: Time = Seconds(1)
) {
test("window - " + name) {
- val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
+ val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
val operation = (s: DStream[Int]) => s.window(windowTime, slideTime)
testOperation(input, operation, expectedOutput, numBatches, true)
}
@@ -265,7 +265,7 @@ class WindowOperationsSuite extends TestSuiteBase {
slideTime: Time = Seconds(1)
) {
test("reduceByKeyAndWindow - " + name) {
- val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
+ val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
s.reduceByKeyAndWindow(_ + _, windowTime, slideTime).persist()
}
@@ -281,7 +281,7 @@ class WindowOperationsSuite extends TestSuiteBase {
slideTime: Time = Seconds(1)
) {
test("reduceByKeyAndWindowInv - " + name) {
- val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
+ val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
s.reduceByKeyAndWindow(_ + _, _ - _, windowTime, slideTime)
.persist()