aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-02-19 03:07:10 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-02-19 03:07:10 -0800
commit8b9c673fce1c733c7fcd8b978e84f943be9e9e35 (patch)
tree4ed7422380ffcd0126053d419c97082cc32fc83e
parentbcee3cb2db2f213efdc142d83419dfd9e35bc4fe (diff)
parent7e30c46aaf337eb95c9ec37ddc2ad79439430c96 (diff)
downloadspark-8b9c673fce1c733c7fcd8b978e84f943be9e9e35.tar.gz
spark-8b9c673fce1c733c7fcd8b978e84f943be9e9e35.tar.bz2
spark-8b9c673fce1c733c7fcd8b978e84f943be9e9e35.zip
Merge pull request #476 from tdas/streaming
Major modifications to fix driver fault-tolerance with file input stream
-rw-r--r--core/src/main/scala/spark/rdd/CheckpointRDD.scala4
-rw-r--r--core/src/test/scala/spark/CheckpointSuite.scala10
-rw-r--r--docs/plugin-custom-receiver.md101
-rw-r--r--docs/python-programming-guide.md2
-rw-r--r--docs/streaming-programming-guide.md250
-rw-r--r--examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java (renamed from examples/src/main/scala/spark/streaming/examples/JavaFlumeEventCount.java)0
-rw-r--r--examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java (renamed from examples/src/main/scala/spark/streaming/examples/JavaNetworkWordCount.java)2
-rw-r--r--examples/src/main/java/spark/streaming/examples/JavaQueueStream.java (renamed from examples/src/main/scala/spark/streaming/examples/JavaQueueStream.java)0
-rw-r--r--examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala158
-rw-r--r--examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala12
-rw-r--r--examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala2
-rw-r--r--examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala (renamed from examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala)33
-rw-r--r--examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala13
-rw-r--r--project/SparkBuild.scala8
-rw-r--r--streaming/src/main/scala/spark/streaming/Checkpoint.scala67
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala142
-rw-r--r--streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/DStreamGraph.scala58
-rw-r--r--streaming/src/main/scala/spark/streaming/Duration.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/Interval.scala1
-rw-r--r--streaming/src/main/scala/spark/streaming/JobManager.scala18
-rw-r--r--streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala23
-rw-r--r--streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala154
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala105
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala111
-rw-r--r--streaming/src/main/scala/spark/streaming/Time.scala13
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala27
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala87
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala122
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala17
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala72
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala36
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala11
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala17
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala13
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala30
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala (renamed from examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala)13
-rw-r--r--streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala153
-rw-r--r--streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala392
-rw-r--r--streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala30
-rw-r--r--streaming/src/test/java/spark/streaming/JavaAPISuite.java157
-rw-r--r--streaming/src/test/java/spark/streaming/JavaTestUtils.scala1
-rw-r--r--streaming/src/test/resources/log4j.properties1
-rw-r--r--streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala43
-rw-r--r--streaming/src/test/scala/spark/streaming/CheckpointSuite.scala114
-rw-r--r--streaming/src/test/scala/spark/streaming/FailureSuite.scala189
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala101
-rw-r--r--streaming/src/test/scala/spark/streaming/TestSuiteBase.scala17
-rw-r--r--streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala62
52 files changed, 2142 insertions, 862 deletions
diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala
index 96b593ba7c..a21338f85f 100644
--- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/spark/rdd/CheckpointRDD.scala
@@ -24,8 +24,8 @@ class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: Stri
val dirContents = fs.listStatus(new Path(checkpointPath))
val splitFiles = dirContents.map(_.getPath.toString).filter(_.contains("part-")).sorted
val numSplits = splitFiles.size
- if (!splitFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) ||
- !splitFiles(numSplits-1).endsWith(CheckpointRDD.splitIdToFile(numSplits-1))) {
+ if (numSplits > 0 && (!splitFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) ||
+ !splitFiles(numSplits-1).endsWith(CheckpointRDD.splitIdToFile(numSplits-1)))) {
throw new SparkException("Invalid checkpoint directory: " + checkpointPath)
}
Array.tabulate(numSplits)(i => new CheckpointRDDSplit(i))
diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala
index 0b74607fb8..4425949f46 100644
--- a/core/src/test/scala/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/spark/CheckpointSuite.scala
@@ -162,6 +162,16 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)), true, false)
}
+ test("CheckpointRDD with zero partitions") {
+ val rdd = new BlockRDD[Int](sc, Array[String]())
+ assert(rdd.splits.size === 0)
+ assert(rdd.isCheckpointed === false)
+ rdd.checkpoint()
+ assert(rdd.count() === 0)
+ assert(rdd.isCheckpointed === true)
+ assert(rdd.splits.size === 0)
+ }
+
/**
* Test checkpointing of the final RDD generated by the given operation. By default,
* this method tests whether the size of serialized RDD has reduced after checkpointing or not.
diff --git a/docs/plugin-custom-receiver.md b/docs/plugin-custom-receiver.md
new file mode 100644
index 0000000000..0eb4246158
--- /dev/null
+++ b/docs/plugin-custom-receiver.md
@@ -0,0 +1,101 @@
+---
+layout: global
+title: Tutorial - Spark streaming, Plugging in a custom receiver.
+---
+
+A "Spark streaming" receiver can be a simple network stream, streams of messages from a message queue, files etc. A receiver can also assume roles more than just receiving data like filtering, preprocessing, to name a few of the possibilities. The api to plug-in any user defined custom receiver is thus provided to encourage development of receivers which may be well suited to ones specific need.
+
+This guide shows the programming model and features by walking through a simple sample receiver and corresponding Spark Streaming application.
+
+
+## A quick and naive walk-through
+
+### Write a simple receiver
+
+This starts with implementing [Actor](#References)
+
+Following is a simple socket text-stream receiver, which is appearently overly simplified using Akka's socket.io api.
+
+{% highlight scala %}
+
+ class SocketTextStreamReceiver (host:String,
+ port:Int,
+ bytesToString: ByteString => String) extends Actor with Receiver {
+
+ override def preStart = IOManager(context.system).connect(host, port)
+
+ def receive = {
+ case IO.Read(socket, bytes) => pushBlock(bytesToString(bytes))
+ }
+
+ }
+
+
+{% endhighlight %}
+
+All we did here is mixed in trait Receiver and called pushBlock api method to push our blocks of data. Please refer to scala-docs of Receiver for more details.
+
+### A sample spark application
+
+* First create a Spark streaming context with master url and batchduration.
+
+{% highlight scala %}
+
+ val ssc = new StreamingContext(master, "WordCountCustomStreamSource",
+ Seconds(batchDuration))
+
+{% endhighlight %}
+
+* Plug-in the actor configuration into the spark streaming context and create a DStream.
+
+{% highlight scala %}
+
+ val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
+ "localhost",8445, z => z.utf8String)),"SocketReceiver")
+
+{% endhighlight %}
+
+* Process it.
+
+{% highlight scala %}
+
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+
+ wordCounts.print()
+ ssc.start()
+
+
+{% endhighlight %}
+
+* After processing it, stream can be tested using the netcat utility.
+
+ $ nc -l localhost 8445
+ hello world
+ hello hello
+
+
+## Multiple homogeneous/heterogeneous receivers.
+
+A DStream union operation is provided for taking union on multiple input streams.
+
+{% highlight scala %}
+
+ val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
+ "localhost",8445, z => z.utf8String)),"SocketReceiver")
+
+ // Another socket stream receiver
+ val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
+ "localhost",8446, z => z.utf8String)),"SocketReceiver")
+
+ val union = lines.union(lines2)
+
+{% endhighlight %}
+
+Above stream can be easily process as described earlier.
+
+_A more comprehensive example is provided in the spark streaming examples_
+
+## References
+
+1.[Akka Actor documentation](http://doc.akka.io/docs/akka/2.0.5/scala/actors.html)
diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md
index 4e84d23edf..2012241a6a 100644
--- a/docs/python-programming-guide.md
+++ b/docs/python-programming-guide.md
@@ -87,7 +87,7 @@ By default, the `pyspark` shell creates SparkContext that runs jobs locally.
To connect to a non-local cluster, set the `MASTER` environment variable.
For example, to use the `pyspark` shell with a [standalone Spark cluster](spark-standalone.html):
-{% highlight shell %}
+{% highlight bash %}
$ MASTER=spark://IP:PORT ./pyspark
{% endhighlight %}
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index b6da7af654..71e1bd4aab 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -34,34 +34,34 @@ The StreamingContext is used to creating InputDStreams from input sources:
{% highlight scala %}
// Assuming ssc is the StreamingContext
-ssc.networkStream(hostname, port) // Creates a stream that uses a TCP socket to read data from hostname:port
-ssc.textFileStream(directory) // Creates a stream by monitoring and processing new files in a HDFS directory
+ssc.textFileStream(directory) // Creates a stream by monitoring and processing new files in a HDFS directory
+ssc.socketStream(hostname, port) // Creates a stream that uses a TCP socket to read data from hostname:port
{% endhighlight %}
-A complete list of input sources is available in the [StreamingContext API documentation](api/streaming/index.html#spark.streaming.StreamingContext). Data received from these sources can be processed using DStream operations, which are explained next.
+We also provide a input streams for Kafka, Flume, Akka actor, etc. For a complete list of input streams, take a look at the [StreamingContext API documentation](api/streaming/index.html#spark.streaming.StreamingContext).
# DStream Operations
-Once an input DStream has been created, you can transform it using _DStream operators_. Most of these operators return new DStreams which you can further transform. Eventually, you'll need to call an _output operator_, which forces evaluation of the DStream by writing data out to an external source.
+Data received from the input streams can be processed using _DStream operations_. There are two kinds of operations - _transformations_ and _output operations_. Similar to RDD transformations, DStream transformations operate on one or more DStreams to create new DStreams with transformed data. After applying a sequence of transformations to the input streams, you'll need to call the output operations, which writies data out to an external source.
## Transformations
DStreams support many of the transformations available on normal Spark RDD's:
<table class="table">
-<tr><th style="width:25%">Transformation</th><th>Meaning</th></tr>
+<tr><th style="width:30%">Transformation</th><th>Meaning</th></tr>
<tr>
<td> <b>map</b>(<i>func</i>) </td>
- <td> Returns a new DStream formed by passing each element of the source through a function <i>func</i>. </td>
+ <td> Returns a new DStream formed by passing each element of the source DStream through a function <i>func</i>. </td>
</tr>
<tr>
<td> <b>filter</b>(<i>func</i>) </td>
- <td> Returns a new stream formed by selecting those elements of the source on which <i>func</i> returns true. </td>
+ <td> Returns a new DStream formed by selecting those elements of the source DStream on which <i>func</i> returns true. </td>
</tr>
<tr>
<td> <b>flatMap</b>(<i>func</i>) </td>
- <td> Similar to map, but each input item can be mapped to 0 or more output items (so <i>func</i> should return a Seq rather than a single item). </td>
+ <td> Similar to map, but each input item can be mapped to 0 or more output items (so <i>func</i> should return a <code>Seq</code> rather than a single item). </td>
</tr>
<tr>
<td> <b>mapPartitions</b>(<i>func</i>) </td>
@@ -70,73 +70,92 @@ DStreams support many of the transformations available on normal Spark RDD's:
</tr>
<tr>
<td> <b>union</b>(<i>otherStream</i>) </td>
- <td> Return a new stream that contains the union of the elements in the source stream and the argument. </td>
+ <td> Return a new DStream that contains the union of the elements in the source DStream and the argument DStream. </td>
+</tr>
+<tr>
+ <td> <b>count</b>() </td>
+ <td> Returns a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream. </td>
+</tr>
+<tr>
+ <td> <b>reduce</b>(<i>func</i>) </td>
+ <td> Returns a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function <i>func</i> (which takes two arguments and returns one). The function should be associative so that it can be computed in parallel. </td>
+</tr>
+<tr>
+ <td> <b>countByValue</b>() </td>
+ <td> When called on a DStream of elements of type K, returns a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream. </td>
</tr>
<tr>
<td> <b>groupByKey</b>([<i>numTasks</i>]) </td>
- <td> When called on a stream of (K, V) pairs, returns a stream of (K, Seq[V]) pairs. <br />
-<b>Note:</b> By default, this uses only 8 parallel tasks to do the grouping. You can pass an optional <code>numTasks</code> argument to set a different number of tasks.
+ <td> When called on a DStream of (K, V) pairs, returns a new DStream of (K, Seq[V]) pairs by grouping together all the values of each key in the RDDs of the source DStream. <br />
+ <b>Note:</b> By default, this uses Spark's default number of parallel tasks (2 for local machine, 8 for a cluser) to do the grouping. You can pass an optional <code>numTasks</code> argument to set a different number of tasks.
</td>
</tr>
<tr>
<td> <b>reduceByKey</b>(<i>func</i>, [<i>numTasks</i>]) </td>
- <td> When called on a stream of (K, V) pairs, returns a stream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Like in <code>groupByKey</code>, the number of reduce tasks is configurable through an optional second argument. </td>
+ <td> When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Like in <code>groupByKey</code>, the number of reduce tasks is configurable through an optional second argument. </td>
</tr>
<tr>
<td> <b>join</b>(<i>otherStream</i>, [<i>numTasks</i>]) </td>
- <td> When called on streams of type (K, V) and (K, W), returns a stream of (K, (V, W)) pairs with all pairs of elements for each key. </td>
+ <td> When called on two DStreams of (K, V) and (K, W) pairs, returns a new DStream of (K, (V, W)) pairs with all pairs of elements for each key. </td>
</tr>
<tr>
<td> <b>cogroup</b>(<i>otherStream</i>, [<i>numTasks</i>]) </td>
- <td> When called on DStream of type (K, V) and (K, W), returns a DStream of (K, Seq[V], Seq[W]) tuples.</td>
-</tr>
-<tr>
- <td> <b>reduce</b>(<i>func</i>) </td>
- <td> Returns a new DStream of single-element RDDs by aggregating the elements of the stream using a function func (which takes two arguments and returns one). The function should be associative so that it can be computed correctly in parallel. </td>
+ <td> When called on DStream of (K, V) and (K, W) pairs, returns a new DStream of (K, Seq[V], Seq[W]) tuples.</td>
</tr>
<tr>
<td> <b>transform</b>(<i>func</i>) </td>
<td> Returns a new DStream by applying func (a RDD-to-RDD function) to every RDD of the stream. This can be used to do arbitrary RDD operations on the DStream. </td>
</tr>
+<tr>
+ <td> <b>updateStateByKey</b>(<i>func</i>) </td>
+ <td> Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values of each key. This can be used to track session state by using the session-id as the key and updating the session state as new data is received.</td>
+</tr>
+
</table>
-Spark Streaming features windowed computations, which allow you to report statistics over a sliding window of data. All window functions take a <i>windowDuration</i>, which represents the width of the window and a <i>slideTime</i>, which represents the frequency during which the window is calculated.
+Spark Streaming features windowed computations, which allow you to apply transformations over a sliding window of data. All window functions take a <i>windowDuration</i>, which represents the width of the window and a <i>slideTime</i>, which represents the frequency during which the window is calculated.
<table class="table">
-<tr><th style="width:25%">Transformation</th><th>Meaning</th></tr>
+<tr><th style="width:30%">Transformation</th><th>Meaning</th></tr>
<tr>
- <td> <b>window</b>(<i>windowDuration</i>, </i>slideTime</i>) </td>
- <td> Return a new stream which is computed based on windowed batches of the source stream. <i>windowDuration</i> is the width of the window and <i>slideTime</i> is the frequency during which the window is calculated. Both times must be multiples of the batch interval.
+ <td> <b>window</b>(<i>windowDuration</i>, </i>slideDuration</i>) </td>
+ <td> Return a new DStream which is computed based on windowed batches of the source DStream. <i>windowDuration</i> is the width of the window and <i>slideTime</i> is the frequency during which the window is calculated. Both times must be multiples of the batch interval.
</td>
</tr>
<tr>
- <td> <b>countByWindow</b>(<i>windowDuration</i>, </i>slideTime</i>) </td>
+ <td> <b>countByWindow</b>(<i>windowDuration</i>, </i>slideDuration</i>) </td>
<td> Return a sliding count of elements in the stream. <i>windowDuration</i> and <i>slideDuration</i> are exactly as defined in <code>window()</code>.
</td>
</tr>
<tr>
- <td> <b>reduceByWindow</b>(<i>func</i>, <i>windowDuration</i>, </i>slideDuration</i>) </td>
+ <td> <b>reduceByWindow</b>(<i>func</i>, <i>windowDuration</i>, <i>slideDuration</i>) </td>
<td> Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using <i>func</i>. The function should be associative so that it can be computed correctly in parallel. <i>windowDuration</i> and <i>slideDuration</i> are exactly as defined in <code>window()</code>.
</td>
</tr>
<tr>
- <td> <b>groupByKeyAndWindow</b>(windowDuration, slideDuration, [<i>numTasks</i>])
+ <td> <b>groupByKeyAndWindow</b>(<i>windowDuration</i>, <i>slideDuration</i>, [<i>numTasks</i>])
</td>
- <td> When called on a stream of (K, V) pairs, returns a stream of (K, Seq[V]) pairs over a sliding window. <br />
-<b>Note:</b> By default, this uses only 8 parallel tasks to do the grouping. You can pass an optional <code>numTasks</code> argument to set a different number of tasks. <i>windowDuration</i> and <i>slideDuration</i> are exactly as defined in <code>window()</code>.
-</td>
+ <td> When called on a DStream of (K, V) pairs, returns a new DStream of (K, Seq[V]) pairs by grouping together values of each key over batches in a sliding window. <br />
+<b>Note:</b> By default, this uses Spark's default number of parallel tasks (2 for local machine, 8 for a cluser) to do the grouping. You can pass an optional <code>numTasks</code> argument to set a different number of tasks.</td>
</tr>
<tr>
- <td> <b>reduceByKeyAndWindow</b>(<i>func</i>, [<i>numTasks</i>]) </td>
- <td> When called on a stream of (K, V) pairs, returns a stream of (K, V) pairs where the values for each key are aggregated using the given reduce function over batches within a sliding window. Like in <code>groupByKeyAndWindow</code>, the number of reduce tasks is configurable through an optional second argument.
+ <td> <b>reduceByKeyAndWindow</b>(<i>func</i>, <i>windowDuration</i>, <i>slideDuration</i>, [<i>numTasks</i>]) </td>
+ <td> When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function <i>func</i> over batches in a sliding window. Like in <code>groupByKeyAndWindow</code>, the number of reduce tasks is configurable through an optional second argument.
<i>windowDuration</i> and <i>slideDuration</i> are exactly as defined in <code>window()</code>.
</td>
</tr>
<tr>
- <td> <b>countByKeyAndWindow</b>([<i>numTasks</i>]) </td>
- <td> When called on a stream of (K, V) pairs, returns a stream of (K, Int) pairs where the values for each key are the count within a sliding window. Like in <code>countByKeyAndWindow</code>, the number of reduce tasks is configurable through an optional second argument.
+ <td> <b>reduceByKeyAndWindow</b>(<i>func</i>, <i>invFunc</i>, <i>windowDuration</i>, <i>slideDuration</i>, [<i>numTasks</i>]) </td>
+ <td> A more efficient version of the above <code>reduceByKeyAndWindow()</code> where the reduce value of each window is calculated
+ incrementally using the reduce values of the previous window. This is done by reducing the new data that enter the sliding window, and "inverse reducing" the old data that leave the window. An example would be that of "adding" and "subtracting" counts of keys as the window slides. However, it is applicable to only "invertible reduce functions", that is, those reduce functions which have a corresponding "inverse reduce" function (taken as parameter <i>invFunc</i>. Like in <code>groupByKeyAndWindow</code>, the number of reduce tasks is configurable through an optional second argument.
<i>windowDuration</i> and <i>slideDuration</i> are exactly as defined in <code>window()</code>.
-</td>
+</td>
+</tr>
+<tr>
+ <td> <b>countByValueAndWindow</b>(<i>windowDuration</i>, <i>slideDuration</i>, [<i>numTasks</i>]) </td>
+ <td> When called on a DStream of (K, V) pairs, returns a new DStream of (K, Long) pairs where the value of each key is its frequency within a sliding window. Like in <code>groupByKeyAndWindow</code>, the number of reduce tasks is configurable through an optional second argument.
+ <i>windowDuration</i> and <i>slideDuration</i> are exactly as defined in <code>window()</code>.
+</td>
</tr>
</table>
@@ -147,7 +166,7 @@ A complete list of DStream operations is available in the API documentation of [
When an output operator is called, it triggers the computation of a stream. Currently the following output operators are defined:
<table class="table">
-<tr><th style="width:25%">Operator</th><th>Meaning</th></tr>
+<tr><th style="width:30%">Operator</th><th>Meaning</th></tr>
<tr>
<td> <b>foreach</b>(<i>func</i>) </td>
<td> The fundamental output operator. Applies a function, <i>func</i>, to each RDD generated from the stream. This function should have side effects, such as printing output, saving the RDD to external files, or writing it over the network to an external system. </td>
@@ -176,11 +195,6 @@ When an output operator is called, it triggers the computation of a stream. Curr
</table>
-## DStream Persistence
-Similar to RDDs, DStreams also allow developers to persist the stream's data in memory. That is, using `persist()` method on a DStream would automatically persist every RDD of that DStream in memory. This is useful if the data in the DStream will be computed multiple times (e.g., multiple DStream operations on the same data). For window-based operations like `reduceByWindow` and `reduceByKeyAndWindow` and state-based operations like `updateStateByKey`, this is implicitly true. Hence, DStreams generated by window-based operations are automatically persisted in memory, without the developer calling `persist()`.
-
-Note that, unlike RDDs, the default persistence level of DStreams keeps the data serialized in memory. This is further discussed in the [Performance Tuning](#memory-tuning) section. More information on different persistence levels can be found in [Spark Programming Guide](scala-programming-guide.html#rdd-persistence).
-
# Starting the Streaming computation
All the above DStream operations are completely lazy, that is, the operations will start executing only after the context is started by using
{% highlight scala %}
@@ -192,8 +206,8 @@ Conversely, the computation can be stopped by using
ssc.stop()
{% endhighlight %}
-# Example - NetworkWordCount.scala
-A good example to start off is the spark.streaming.examples.NetworkWordCount. This example counts the words received from a network server every second. Given below is the relevant sections of the source code. You can find the full source code in <Spark repo>/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala.
+# Example
+A simple example to start off is the [NetworkWordCount](https://github.com/mesos/spark/tree/master/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala). This example counts the words received from a network server every second. Given below is the relevant sections of the source code. You can find the full source code in `<Spark repo>/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala` .
{% highlight scala %}
import spark.streaming.{Seconds, StreamingContext}
@@ -260,6 +274,31 @@ Time: 1357008430000 ms
</td>
</table>
+You can find more examples in `<Spark repo>/streaming/src/main/scala/spark/streaming/examples/`. They can be run in the similar manner using `./run spark.streaming.examples....` . Executing without any parameter would give the required parameter list. Further explanation to run them can be found in comments in the files.
+
+# DStream Persistence
+Similar to RDDs, DStreams also allow developers to persist the stream's data in memory. That is, using `persist()` method on a DStream would automatically persist every RDD of that DStream in memory. This is useful if the data in the DStream will be computed multiple times (e.g., multiple operations on the same data). For window-based operations like `reduceByWindow` and `reduceByKeyAndWindow` and state-based operations like `updateStateByKey`, this is implicitly true. Hence, DStreams generated by window-based operations are automatically persisted in memory, without the developer calling `persist()`.
+
+For input streams that receive data from the network (that is, subclasses of NetworkInputDStream like FlumeInputDStream and KafkaInputDStream), the default persistence level is set to replicate the data to two nodes for fault-tolerance.
+
+Note that, unlike RDDs, the default persistence level of DStreams keeps the data serialized in memory. This is further discussed in the [Performance Tuning](#memory-tuning) section. More information on different persistence levels can be found in [Spark Programming Guide](scala-programming-guide.html#rdd-persistence).
+
+# RDD Checkpointing within DStreams
+DStreams created by stateful operations like `updateStateByKey` require the RDDs in the DStream to be periodically saved to HDFS files for checkpointing. This is because, unless checkpointed, the lineage of operations of the state RDDs can increase indefinitely (since each RDD in the DStream depends on the previous RDD). This leads to two problems - (i) the size of Spark tasks increase proportionally with the RDD lineage leading higher task launch times, (ii) no limit on the amount of recomputation required on failure. Checkpointing RDDs at some interval by writing them to HDFS allows the lineage to be truncated. Note that checkpointing also incurs the cost of saving to HDFS which may cause the corresponding batch to take longer to process. Hence, the interval of checkpointing needs to be set carefully. At small batch sizes (say 1 second), checkpointing every batch may significantly reduce operation throughput. Conversely, checkpointing too slowly causes the lineage and task sizes to grow which may have detrimental effects. Typically, a checkpoint interval of 5 - 10 times of sliding interval of a DStream is good setting to try.
+
+To enable checkpointing, the developer has to provide the HDFS path to which RDD will be saved. This is done by using
+
+{% highlight scala %}
+ssc.checkpoint(hdfsPath) // assuming ssc is the StreamingContext
+{% endhighlight %}
+
+The interval of checkpointing of a DStream can be set by using
+
+{% highlight scala %}
+dstream.checkpoint(checkpointInterval) // checkpointInterval must be a multiple of slide duration of dstream
+{% endhighlight %}
+
+For DStreams that must be checkpointed (that is, DStreams created by `updateStateByKey` and `reduceByKeyAndWindow` with inverse function), the checkpoint interval of the DStream is by default set to a multiple of the DStream's sliding interval such that its at least 10 seconds.
# Performance Tuning
@@ -273,17 +312,21 @@ Getting the best performance of a Spark Streaming application on a cluster requi
There are a number of optimizations that can be done in Spark to minimize the processing time of each batch. These have been discussed in detail in [Tuning Guide](tuning.html). This section highlights some of the most important ones.
### Level of Parallelism
-Cluster resources maybe underutilized if the number of parallel tasks used in any stage of the computation is not high enough. For example, for distributed reduce operations like `reduceByKey` and `reduceByKeyAndWindow`, the default number of parallel tasks is 8. You can pass the level of parallelism as an argument (see the [`spark.PairDStreamFunctions`](api/streaming/index.html#spark.PairDStreamFunctions) documentation), or set the system property `spark.default.parallelism` to change the default.
+Cluster resources maybe under-utilized if the number of parallel tasks used in any stage of the computation is not high enough. For example, for distributed reduce operations like `reduceByKey` and `reduceByKeyAndWindow`, the default number of parallel tasks is 8. You can pass the level of parallelism as an argument (see the [`spark.PairDStreamFunctions`](api/streaming/index.html#spark.PairDStreamFunctions) documentation), or set the system property `spark.default.parallelism` to change the default.
### Data Serialization
The overhead of data serialization can be significant, especially when sub-second batch sizes are to be achieved. There are two aspects to it.
-* Serialization of RDD data in Spark: Please refer to the detailed discussion on data serialization in the [Tuning Guide](tuning.html). However, note that unlike Spark, by default RDDs are persisted as serialized byte arrays to minimize pauses related to GC.
-* Serialization of input data: To ingest external data into Spark, data received as bytes (say, from the network) needs to deserialized from bytes and re-serialized into Spark's serialization format. Hence, the deserialization overhead of input data may be a bottleneck.
+
+* **Serialization of RDD data in Spark**: Please refer to the detailed discussion on data serialization in the [Tuning Guide](tuning.html). However, note that unlike Spark, by default RDDs are persisted as serialized byte arrays to minimize pauses related to GC.
+
+* **Serialization of input data**: To ingest external data into Spark, data received as bytes (say, from the network) needs to deserialized from bytes and re-serialized into Spark's serialization format. Hence, the deserialization overhead of input data may be a bottleneck.
### Task Launching Overheads
If the number of tasks launched per second is high (say, 50 or more per second), then the overhead of sending out tasks to the slaves maybe significant and will make it hard to achieve sub-second latencies. The overhead can be reduced by the following changes:
-* Task Serialization: Using Kryo serialization for serializing tasks can reduced the task sizes, and therefore reduce the time taken to send them to the slaves.
-* Execution mode: Running Spark in Standalone mode or coarse-grained Mesos mode leads to better task launch times than the fine-grained Mesos mode. Please refer to the [Running on Mesos guide](running-on-mesos.html) for more details.
+
+* **Task Serialization**: Using Kryo serialization for serializing tasks can reduced the task sizes, and therefore reduce the time taken to send them to the slaves.
+
+* **Execution mode**: Running Spark in Standalone mode or coarse-grained Mesos mode leads to better task launch times than the fine-grained Mesos mode. Please refer to the [Running on Mesos guide](running-on-mesos.html) for more details.
These changes may reduce batch processing time by 100s of milliseconds, thus allowing sub-second batch size to be viable.
## Setting the Right Batch Size
@@ -292,22 +335,121 @@ For a Spark Streaming application running on a cluster to be stable, the process
A good approach to figure out the right batch size for your application is to test it with a conservative batch size (say, 5-10 seconds) and a low data rate. To verify whether the system is able to keep up with data rate, you can check the value of the end-to-end delay experienced by each processed batch (in the Spark master logs, find the line having the phrase "Total delay"). If the delay is maintained to be less than the batch size, then system is stable. Otherwise, if the delay is continuously increasing, it means that the system is unable to keep up and it therefore unstable. Once you have an idea of a stable configuration, you can try increasing the data rate and/or reducing the batch size. Note that momentary increase in the delay due to temporary data rate increases maybe fine as long as the delay reduces back to a low value (i.e., less than batch size).
## 24/7 Operation
-By default, Spark does not forget any of the metadata (RDDs generated, stages processed, etc.). But for a Spark Streaming application to operate 24/7, it is necessary for Spark to do periodic cleanup of it metadata. This can be enabled by setting the Java system property `spark.cleaner.delay` to the number of minutes you want any metadata to persist. For example, setting `spark.cleaner.delay` to 10 would cause Spark periodically cleanup all metadata and persisted RDDs that are older than 10 minutes. Note, that this property needs to be set before the SparkContext is created.
+By default, Spark does not forget any of the metadata (RDDs generated, stages processed, etc.). But for a Spark Streaming application to operate 24/7, it is necessary for Spark to do periodic cleanup of it metadata. This can be enabled by setting the Java system property `spark.cleaner.delay` to the number of seconds you want any metadata to persist. For example, setting `spark.cleaner.delay` to 600 would cause Spark periodically cleanup all metadata and persisted RDDs that are older than 10 minutes. Note, that this property needs to be set before the SparkContext is created.
This value is closely tied with any window operation that is being used. Any window operation would require the input data to be persisted in memory for at least the duration of the window. Hence it is necessary to set the delay to at least the value of the largest window operation used in the Spark Streaming application. If this delay is set too low, the application will throw an exception saying so.
## Memory Tuning
Tuning the memory usage and GC behavior of Spark applications have been discussed in great detail in the [Tuning Guide](tuning.html). It is recommended that you read that. In this section, we highlight a few customizations that are strongly recommended to minimize GC related pauses in Spark Streaming applications and achieving more consistent batch processing times.
-* <b>Default persistence level of DStreams</b>: Unlike RDDs, the default persistence level of DStreams serializes the data in memory (that is, [StorageLevel.MEMORY_ONLY_SER](api/core/index.html#spark.storage.StorageLevel$) for DStream compared to [StorageLevel.MEMORY_ONLY](api/core/index.html#spark.storage.StorageLevel$) for RDDs). Even though keeping the data serialized incurs a higher serialization overheads, it significantly reduces GC pauses.
+* **Default persistence level of DStreams**: Unlike RDDs, the default persistence level of DStreams serializes the data in memory (that is, [StorageLevel.MEMORY_ONLY_SER](api/core/index.html#spark.storage.StorageLevel$) for DStream compared to [StorageLevel.MEMORY_ONLY](api/core/index.html#spark.storage.StorageLevel$) for RDDs). Even though keeping the data serialized incurs a higher serialization overheads, it significantly reduces GC pauses.
+
+* **Concurrent garbage collector**: Using the concurrent mark-and-sweep GC further minimizes the variability of GC pauses. Even though concurrent GC is known to reduce the overall processing throughput of the system, its use is still recommended to achieve more consistent batch processing times.
-* <b>Concurrent garbage collector</b>: Using the concurrent mark-and-sweep GC further minimizes the variability of GC pauses. Even though concurrent GC is known to reduce the overall processing throughput of the system, its use is still recommended to achieve more consistent batch processing times.
+# Fault-tolerance Properties
+There are two aspects to fault-tolerance - failure of a worker node and that of a driver node. In this section, we are going to discuss the fault-tolerance behavior and the semantics of the processed data.
-# Master Fault-tolerance (Alpha)
-TODO
+## Failure of a Worker Node
+In case of the worker node failure, none of the processed data will be lost because
-* Checkpointing of DStream graph
+1. All the input data is fault-tolerant (either the data is on HDFS, or it replicated Spark Streaming if received from the network)
+1. All intermediate data is expressed as RDDs with their lineage to the input data, which allows Spark to recompute any part of the intermediate data is lost to worker node failure.
+
+If the worker node where a network data receiver is running fails, then the receiver will be restarted on a different node and it will continue to receive data. However, data that was accepted by the receiver but not yet replicated to other Spark nodes may be lost, which is a fraction of a second of data.
+
+Since all data is modeled as RDDs with their lineage of deterministic operations, any recomputation always leads to the same result. As a result, all DStream transformations are guaranteed to have _exactly-once_ semantics. That is, the final transformed result will be same even if there were was a worker node failure. However, output operations (like `foreach`) have _at-least once_ semantics, that is, the transformed data may get written to an external entity more than once in the event of a worker failure. While this is acceptable for saving to HDFS using the `saveAs*Files` operations (as the file will simply get over-written by the same data), additional transactions-like mechanisms may be necessary to achieve exactly-once semantics for output operations.
+
+## Failure of a Driver Node
+A system that is required to operate 24/7 needs to be able tolerate the failure of the drive node as well. Spark Streaming does this by saving the state of the DStream computation periodically to a HDFS file, that can be used to restart the streaming computation in the event of a failure of the driver node. To elaborate, the following state is periodically saved to a file.
+
+1. The DStream operator graph (input streams, output streams, etc.)
+1. The configuration of each DStream (checkpoint interval, etc.)
+1. The RDD checkpoint files of each DStream
+
+All this is periodically saved in the file `<checkpoint directory>/graph` where `<checkpoint directory>` is the HDFS path set using `ssc.checkpoint(...)` as described earlier. To recover, a new Streaming Context can be created with this directory by using
+
+{% highlight scala %}
+val ssc = new StreamingContext(checkpointDirectory)
+{% endhighlight %}
+
+Calling `ssc.start()` on this new context will restart the receivers and the stream computations.
+
+In case of stateful operations (that is, `updateStateByKey` and `reduceByKeyAndWindow` with inverse function), the intermediate data at the time of failure also needs to be recomputed.This requires two things - (i) the RDD checkpoints and (ii) the data received since the checkpoints. In the current _alpha_ release, the input data received from the network is not saved durably across driver failures (the data is only replicated in memory of the worker processes and gets lost when the driver fails). Only with file input streams (where the data is already durably stored) is the recovery from driver failure complete and all intermediate data is recomputed. In a future release, this will be true for all input streams. Note that for non-stateful operations, with _all_ input streams, the system will recover and continue receiving and processing new data.
+
+To understand the behavior of the system under driver failure, lets consider what will happen with a file input stream Specifically, in the case of the file input stream, it will correctly identify new files that were created while the driver was down and process them in the same way as it would have if the driver had not failed. To explain further in the case of file input stream, we shall use an example. Lets say, files are being generated every second, and a Spark Streaming program reads every new file and output the number of lines in the file. This is what the sequence of outputs would be with and without a driver failure.
+
+<table class="table">
+ <!-- Results table headers -->
+ <tr>
+ <th> Time </th>
+ <th> Number of lines in input file </th>
+ <th> Output without driver failure </th>
+ <th> Output with driver failure </th>
+ </tr>
+ <tr>
+ <td>1</td>
+ <td>10</td>
+ <td>10</td>
+ <td>10</td>
+ </tr>
+ <tr>
+ <td>2</td>
+ <td>20</td>
+ <td>20</td>
+ <td>20</td>
+ </tr>
+ <tr>
+ <td>3</td>
+ <td>30</td>
+ <td>30</td>
+ <td>30</td>
+ </tr>
+ <tr>
+ <td>4</td>
+ <td>40</td>
+ <td>40</td>
+ <td>[DRIVER FAILS]<br />no output</td>
+ </tr>
+ <tr>
+ <td>5</td>
+ <td>50</td>
+ <td>50</td>
+ <td>no output</td>
+ </tr>
+ <tr>
+ <td>6</td>
+ <td>60</td>
+ <td>60</td>
+ <td>no output</td>
+ </tr>
+ <tr>
+ <td>7</td>
+ <td>70</td>
+ <td>70</td>
+ <td>[DRIVER RECOVERS]<br />40, 50, 60, 70</td>
+ </tr>
+ <tr>
+ <td>8</td>
+ <td>80</td>
+ <td>80</td>
+ <td>80</td>
+ </tr>
+ <tr>
+ <td>9</td>
+ <td>90</td>
+ <td>90</td>
+ <td>90</td>
+ </tr>
+ <tr>
+ <td>10</td>
+ <td>100</td>
+ <td>100</td>
+ <td>100</td>
+ </tr>
+</table>
-* Recovery from master faults
+If the driver had crashed in the middle of the processing of time 3, then it will process time 3 and output 30 after recovery.
-* Current state and future directions \ No newline at end of file
+# Where to Go from Here
+* Documentation - [Scala and Java](api/streaming/index.html)
+* More examples - [Scala](https://github.com/mesos/spark/tree/master/examples/src/main/scala/spark/streaming/examples) and [Java](https://github.com/mesos/spark/tree/master/examples/src/main/java/spark/streaming/examples)
diff --git a/examples/src/main/scala/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java
index cddce16e39..cddce16e39 100644
--- a/examples/src/main/scala/spark/streaming/examples/JavaFlumeEventCount.java
+++ b/examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java
diff --git a/examples/src/main/scala/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java
index 4299febfd6..07342beb02 100644
--- a/examples/src/main/scala/spark/streaming/examples/JavaNetworkWordCount.java
+++ b/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java
@@ -35,7 +35,7 @@ public class JavaNetworkWordCount {
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
- JavaDStream<String> lines = ssc.networkTextStream(args[1], Integer.parseInt(args[2]));
+ JavaDStream<String> lines = ssc.socketTextStream(args[1], Integer.parseInt(args[2]));
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
diff --git a/examples/src/main/scala/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/spark/streaming/examples/JavaQueueStream.java
index 43c3cd4dfa..43c3cd4dfa 100644
--- a/examples/src/main/scala/spark/streaming/examples/JavaQueueStream.java
+++ b/examples/src/main/java/spark/streaming/examples/JavaQueueStream.java
diff --git a/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala
new file mode 100644
index 0000000000..71b4e5bf1a
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala
@@ -0,0 +1,158 @@
+package spark.streaming.examples
+
+import scala.collection.mutable.LinkedList
+import scala.util.Random
+
+import akka.actor.Actor
+import akka.actor.ActorRef
+import akka.actor.Props
+import akka.actor.actorRef2Scala
+
+import spark.streaming.Seconds
+import spark.streaming.StreamingContext
+import spark.streaming.StreamingContext.toPairDStreamFunctions
+import spark.streaming.receivers.Receiver
+import spark.util.AkkaUtils
+
+case class SubscribeReceiver(receiverActor: ActorRef)
+case class UnsubscribeReceiver(receiverActor: ActorRef)
+
+/**
+ * Sends the random content to every receiver subscribed with 1/2
+ * second delay.
+ */
+class FeederActor extends Actor {
+
+ val rand = new Random()
+ var receivers: LinkedList[ActorRef] = new LinkedList[ActorRef]()
+
+ val strings: Array[String] = Array("words ", "may ", "count ")
+
+ def makeMessage(): String = {
+ val x = rand.nextInt(3)
+ strings(x) + strings(2 - x)
+ }
+
+ /*
+ * A thread to generate random messages
+ */
+ new Thread() {
+ override def run() {
+ while (true) {
+ Thread.sleep(500)
+ receivers.foreach(_ ! makeMessage)
+ }
+ }
+ }.start()
+
+ def receive: Receive = {
+
+ case SubscribeReceiver(receiverActor: ActorRef) =>
+ println("received subscribe from %s".format(receiverActor.toString))
+ receivers = LinkedList(receiverActor) ++ receivers
+
+ case UnsubscribeReceiver(receiverActor: ActorRef) =>
+ println("received unsubscribe from %s".format(receiverActor.toString))
+ receivers = receivers.dropWhile(x => x eq receiverActor)
+
+ }
+}
+
+/**
+ * A sample actor as receiver, is also simplest. This receiver actor
+ * goes and subscribe to a typical publisher/feeder actor and receives
+ * data.
+ *
+ * @see [[spark.streaming.examples.FeederActor]]
+ */
+class SampleActorReceiver[T: ClassManifest](urlOfPublisher: String)
+extends Actor with Receiver {
+
+ lazy private val remotePublisher = context.actorFor(urlOfPublisher)
+
+ override def preStart = remotePublisher ! SubscribeReceiver(context.self)
+
+ def receive = {
+ case msg ⇒ context.parent ! pushBlock(msg.asInstanceOf[T])
+ }
+
+ override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)
+
+}
+
+/**
+ * A sample feeder actor
+ *
+ * Usage: FeederActor <hostname> <port>
+ * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder would start on.
+ */
+object FeederActor {
+
+ def main(args: Array[String]) {
+ if(args.length < 2){
+ System.err.println(
+ "Usage: FeederActor <hostname> <port>\n"
+ )
+ System.exit(1)
+ }
+ val Seq(host, port) = args.toSeq
+
+
+ val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt)._1
+ val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor")
+
+ println("Feeder started as:" + feeder)
+
+ actorSystem.awaitTermination();
+ }
+}
+
+/**
+ * A sample word count program demonstrating the use of plugging in
+ * Actor as Receiver
+ * Usage: ActorWordCount <master> <hostname> <port>
+ * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on.
+ *
+ * To run this example locally, you may run Feeder Actor as
+ * `$ ./run spark.streaming.examples.FeederActor 127.0.1.1 9999`
+ * and then run the example
+ * `$ ./run spark.streaming.examples.ActorWordCount local[2] 127.0.1.1 9999`
+ */
+object ActorWordCount {
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println(
+ "Usage: ActorWordCount <master> <hostname> <port>" +
+ "In local mode, <master> should be 'local[n]' with n > 1")
+ System.exit(1)
+ }
+
+ val Seq(master, host, port) = args.toSeq
+
+ // Create the context and set the batch size
+ val ssc = new StreamingContext(master, "ActorWordCount",
+ Seconds(10))
+
+ /*
+ * Following is the use of actorStream to plug in custom actor as receiver
+ *
+ * An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e type of data received and InputDstream
+ * should be same.
+ *
+ * For example: Both actorStream and SampleActorReceiver are parameterized
+ * to same type to ensure type safety.
+ */
+
+ val lines = ssc.actorStream[String](
+ Props(new SampleActorReceiver[String]("akka://spark@%s:%s/user/FeederActor".format(
+ host, port.toInt))), "SampleReceiver")
+
+ //compute wordcount
+ lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print()
+
+ ssc.start()
+ }
+}
diff --git a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
index 65d5da82fc..9b135a5c54 100644
--- a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
@@ -10,6 +10,18 @@ import spark.streaming.StreamingContext._
import spark.storage.StorageLevel
import spark.streaming.util.RawTextHelper._
+/**
+ * Consumes messages from one or more topics in Kafka and does wordcount.
+ * Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>
+ * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ * <zkQuorum> is a list of one or more zookeeper servers that make quorum
+ * <group> is the name of kafka consumer group
+ * <topics> is a list of one or more kafka topics to consume from
+ * <numThreads> is the number of threads the kafka consumer should use
+ *
+ * Example:
+ * `./run spark.streaming.examples.KafkaWordCount local[2] zoo01,zoo02,zoo03 my-consumer-group topic1,topic2 1`
+ */
object KafkaWordCount {
def main(args: Array[String]) {
diff --git a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala
index 32f7d57bea..7ff70ae2e5 100644
--- a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala
@@ -27,7 +27,7 @@ object NetworkWordCount {
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
- val lines = ssc.networkTextStream(args(1), args(2).toInt)
+ val lines = ssc.socketTextStream(args(1), args(2).toInt)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala
index 377bc0c98e..fdb3a4c73c 100644
--- a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala
+++ b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala
@@ -1,19 +1,19 @@
-package spark.streaming.examples.twitter
+package spark.streaming.examples
-import spark.streaming.StreamingContext._
import spark.streaming.{Seconds, StreamingContext}
+import StreamingContext._
import spark.SparkContext._
-import spark.storage.StorageLevel
/**
* Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter
* stream. The stream is instantiated with credentials and optionally filters supplied by the
* command line arguments.
+ *
*/
-object TwitterBasic {
+object TwitterPopularTags {
def main(args: Array[String]) {
if (args.length < 3) {
- System.err.println("Usage: TwitterBasic <master> <twitter_username> <twitter_password>" +
+ System.err.println("Usage: TwitterPopularTags <master> <twitter_username> <twitter_password>" +
" [filter1] [filter2] ... [filter n]")
System.exit(1)
}
@@ -21,10 +21,8 @@ object TwitterBasic {
val Array(master, username, password) = args.slice(0, 3)
val filters = args.slice(3, args.length)
- val ssc = new StreamingContext(master, "TwitterBasic", Seconds(2))
- val stream = new TwitterInputDStream(ssc, username, password, filters,
- StorageLevel.MEMORY_ONLY_SER)
- ssc.registerInputStream(stream)
+ val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2))
+ val stream = ssc.twitterStream(username, password, filters)
val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
@@ -39,22 +37,17 @@ object TwitterBasic {
// Print popular hashtags
topCounts60.foreach(rdd => {
- if (rdd.count() != 0) {
- val topList = rdd.take(5)
- println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
- topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
- }
+ val topList = rdd.take(5)
+ println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
+ topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})
topCounts10.foreach(rdd => {
- if (rdd.count() != 0) {
- val topList = rdd.take(5)
- println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
- topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
- }
+ val topList = rdd.take(5)
+ println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
+ topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})
ssc.start()
}
-
}
diff --git a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
index a191321d91..fba72519a9 100644
--- a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
+++ b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
@@ -27,17 +27,16 @@ object PageViewStream {
val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1))
// Create a NetworkInputDStream on target host:port and convert each line to a PageView
- val pageViews = ssc.networkTextStream(host, port)
- .flatMap(_.split("\n"))
- .map(PageView.fromString(_))
+ val pageViews = ssc.socketTextStream(host, port)
+ .flatMap(_.split("\n"))
+ .map(PageView.fromString(_))
// Return a count of views per URL seen in each batch
- val pageCounts = pageViews.map(view => ((view.url, 1))).countByKey()
+ val pageCounts = pageViews.map(view => view.url).countByValue()
// Return a sliding window of page views per URL in the last ten seconds
- val slidingPageCounts = pageViews.map(view => ((view.url, 1)))
- .window(Seconds(10), Seconds(2))
- .countByKey()
+ val slidingPageCounts = pageViews.map(view => view.url)
+ .countByValueAndWindow(Seconds(10), Seconds(2))
// Return the rate of error pages (a non 200 status) in each zip code over the last 30 seconds
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index af8b5ba017..c6d3cc8b15 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -154,10 +154,7 @@ object SparkBuild extends Build {
)
def examplesSettings = sharedSettings ++ Seq(
- name := "spark-examples",
- libraryDependencies ++= Seq(
- "org.twitter4j" % "twitter4j-stream" % "3.0.3"
- )
+ name := "spark-examples"
)
def bagelSettings = sharedSettings ++ Seq(name := "spark-bagel")
@@ -166,7 +163,8 @@ object SparkBuild extends Build {
name := "spark-streaming",
libraryDependencies ++= Seq(
"org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile",
- "com.github.sgroschupf" % "zkclient" % "0.1"
+ "com.github.sgroschupf" % "zkclient" % "0.1",
+ "org.twitter4j" % "twitter4j-stream" % "3.0.3"
)
) ++ assemblySettings ++ extraAssemblySettings
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index b9eb7f8ec4..7405c8b22e 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -6,6 +6,8 @@ import org.apache.hadoop.fs.{FileUtil, Path}
import org.apache.hadoop.conf.Configuration
import java.io._
+import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
+import java.util.concurrent.Executors
private[streaming]
@@ -38,32 +40,50 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
val conf = new Configuration()
var fs = file.getFileSystem(conf)
val maxAttempts = 3
+ val executor = Executors.newFixedThreadPool(1)
- def write(checkpoint: Checkpoint) {
- // TODO: maybe do this in a different thread from the main stream execution thread
- var attempts = 0
- while (attempts < maxAttempts) {
- attempts += 1
- try {
- logDebug("Saving checkpoint for time " + checkpoint.checkpointTime + " to file '" + file + "'")
- if (fs.exists(file)) {
- val bkFile = new Path(file.getParent, file.getName + ".bk")
- FileUtil.copy(fs, file, fs, bkFile, true, true, conf)
- logDebug("Moved existing checkpoint file to " + bkFile)
+ class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable {
+ def run() {
+ var attempts = 0
+ val startTime = System.currentTimeMillis()
+ while (attempts < maxAttempts) {
+ attempts += 1
+ try {
+ logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'")
+ if (fs.exists(file)) {
+ val bkFile = new Path(file.getParent, file.getName + ".bk")
+ FileUtil.copy(fs, file, fs, bkFile, true, true, conf)
+ logDebug("Moved existing checkpoint file to " + bkFile)
+ }
+ val fos = fs.create(file)
+ fos.write(bytes)
+ fos.close()
+ fos.close()
+ val finishTime = System.currentTimeMillis();
+ logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file +
+ "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " milliseconds")
+ return
+ } catch {
+ case ioe: IOException =>
+ logWarning("Error writing checkpoint to file in " + attempts + " attempts", ioe)
}
- val fos = fs.create(file)
- val oos = new ObjectOutputStream(fos)
- oos.writeObject(checkpoint)
- oos.close()
- logInfo("Checkpoint for time " + checkpoint.checkpointTime + " saved to file '" + file + "'")
- fos.close()
- return
- } catch {
- case ioe: IOException =>
- logWarning("Error writing checkpoint to file in " + attempts + " attempts", ioe)
}
+ logError("Could not write checkpoint for time " + checkpointTime + " to file '" + file + "'")
}
- logError("Could not write checkpoint for time " + checkpoint.checkpointTime + " to file '" + file + "'")
+ }
+
+ def write(checkpoint: Checkpoint) {
+ val bos = new ByteArrayOutputStream()
+ val zos = new LZFOutputStream(bos)
+ val oos = new ObjectOutputStream(zos)
+ oos.writeObject(checkpoint)
+ oos.close()
+ bos.close()
+ executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray))
+ }
+
+ def stop() {
+ executor.shutdown()
}
}
@@ -85,7 +105,8 @@ object CheckpointReader extends Logging {
// of ObjectInputStream is used to explicitly use the current thread's default class
// loader to find and load classes. This is a well know Java issue and has popped up
// in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627)
- val ois = new ObjectInputStreamWithLoader(fis, Thread.currentThread().getContextClassLoader)
+ val zis = new LZFInputStream(fis)
+ val ois = new ObjectInputStreamWithLoader(zis, Thread.currentThread().getContextClassLoader)
val cp = ois.readObject.asInstanceOf[Checkpoint]
ois.close()
fs.close()
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 0eb6aad187..e1be5ef51c 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -132,7 +132,7 @@ abstract class DStream[T: ClassManifest] (
// Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger
if (mustCheckpoint && checkpointDuration == null) {
- checkpointDuration = slideDuration.max(Seconds(10))
+ checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
logInfo("Checkpoint interval automatically set to " + checkpointDuration)
}
@@ -238,13 +238,15 @@ abstract class DStream[T: ClassManifest] (
dependencies.foreach(_.remember(parentRememberDuration))
}
- /** This method checks whether the 'time' is valid wrt slideDuration for generating RDD */
+ /** Checks whether the 'time' is valid wrt slideDuration for generating RDD */
protected def isTimeValid(time: Time): Boolean = {
if (!isInitialized) {
throw new Exception (this + " has not been initialized")
} else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) {
+ logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime + " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime))
false
} else {
+ logInfo("Time " + time + " is valid")
true
}
}
@@ -292,7 +294,7 @@ abstract class DStream[T: ClassManifest] (
* Generate a SparkStreaming job for the given time. This is an internal method that
* should not be called directly. This default implementation creates a job
* that materializes the corresponding RDD. Subclasses of DStream may override this
- * (eg. ForEachDStream).
+ * to generate their own jobs.
*/
protected[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
@@ -308,19 +310,18 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Dereference RDDs that are older than rememberDuration.
+ * Clear metadata that are older than `rememberDuration` of this DStream.
+ * This is an internal method that should not be called directly. This default
+ * implementation clears the old generated RDDs. Subclasses of DStream may override
+ * this to clear their own metadata along with the generated RDDs.
*/
- protected[streaming] def forgetOldMetadata(time: Time) {
+ protected[streaming] def clearOldMetadata(time: Time) {
var numForgotten = 0
- generatedRDDs.keys.foreach(t => {
- if (t <= (time - rememberDuration)) {
- generatedRDDs.remove(t)
- numForgotten += 1
- logInfo("Forgot RDD of time " + t + " from " + this)
- }
- })
- logInfo("Forgot " + numForgotten + " RDDs from " + this)
- dependencies.foreach(_.forgetOldMetadata(time))
+ val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
+ generatedRDDs --= oldRDDs.keys
+ logInfo("Cleared " + oldRDDs.size + " RDDs that were older than " +
+ (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
+ dependencies.foreach(_.clearOldMetadata(time))
}
/* Adds metadata to the Stream while it is running.
@@ -443,6 +444,15 @@ abstract class DStream[T: ClassManifest] (
def count(): DStream[Long] = this.map(_ => 1L).reduce(_ + _)
/**
+ * Return a new DStream in which each RDD contains the counts of each distinct value in
+ * each RDD of this DStream. Hash partitioning is used to generate
+ * the RDDs with `numPartitions` partitions (Spark's default number of partitions if
+ * `numPartitions` not specified).
+ */
+ def countByValue(numPartitions: Int = ssc.sc.defaultParallelism): DStream[(T, Long)] =
+ this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
+
+ /**
* Apply a function to each RDD in this DStream. This is an output operator, so
* this DStream will be registered as an output stream and therefore materialized.
*/
@@ -495,14 +505,16 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Return a new DStream which is computed based on windowed batches of this DStream.
- * The new DStream generates RDDs with the same interval as this DStream.
+ * Return a new DStream in which each RDD contains all the elements in seen in a
+ * sliding window of time over this DStream. The new DStream generates RDDs with
+ * the same interval as this DStream.
* @param windowDuration width of the window; must be a multiple of this DStream's interval.
*/
def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration)
/**
- * Return a new DStream which is computed based on windowed batches of this DStream.
+ * Return a new DStream in which each RDD contains all the elements in seen in a
+ * sliding window of time over this DStream.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
@@ -514,27 +526,39 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Return a new DStream which computed based on tumbling window on this DStream.
- * This is equivalent to window(batchTime, batchTime).
- * @param batchDuration tumbling window duration; must be a multiple of this DStream's
- * batching interval
- */
- def tumble(batchDuration: Duration): DStream[T] = window(batchDuration, batchDuration)
-
- /**
* Return a new DStream in which each RDD has a single element generated by reducing all
- * elements in a window over this DStream. windowDuration and slideDuration are as defined
- * in the window() operation. This is equivalent to
- * window(windowDuration, slideDuration).reduce(reduceFunc)
+ * elements in a sliding window over this DStream.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
*/
def reduceByWindow(
reduceFunc: (T, T) => T,
windowDuration: Duration,
slideDuration: Duration
): DStream[T] = {
- this.window(windowDuration, slideDuration).reduce(reduceFunc)
+ this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc)
}
+ /**
+ * Return a new DStream in which each RDD has a single element generated by reducing all
+ * elements in a sliding window over this DStream. However, the reduction is done incrementally
+ * using the old window's reduced value :
+ * 1. reduce the new values that entered the window (e.g., adding new counts)
+ * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
+ * This is more efficient than reduceByWindow without "inverse reduce" function.
+ * However, it is applicable to only "invertible reduce functions".
+ * @param reduceFunc associative reduce function
+ * @param invReduceFunc inverse reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
def reduceByWindow(
reduceFunc: (T, T) => T,
invReduceFunc: (T, T) => T,
@@ -548,14 +572,47 @@ abstract class DStream[T: ClassManifest] (
/**
* Return a new DStream in which each RDD has a single element generated by counting the number
- * of elements in a window over this DStream. windowDuration and slideDuration are as defined in the
- * window() operation. This is equivalent to window(windowDuration, slideDuration).count()
+ * of elements in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with
+ * Spark's default number of partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
*/
def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long] = {
this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
}
/**
+ * Return a new DStream in which each RDD contains the count of distinct elements in
+ * RDDs in a sliding window over this DStream. Hash partitioning is used to generate
+ * the RDDs with `numPartitions` partitions (Spark's default number of partitions if
+ * `numPartitions` not specified).
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param numPartitions number of partitions of each RDD in the new DStream.
+ */
+ def countByValueAndWindow(
+ windowDuration: Duration,
+ slideDuration: Duration,
+ numPartitions: Int = ssc.sc.defaultParallelism
+ ): DStream[(T, Long)] = {
+
+ this.map(x => (x, 1L)).reduceByKeyAndWindow(
+ (x: Long, y: Long) => x + y,
+ (x: Long, y: Long) => x - y,
+ windowDuration,
+ slideDuration,
+ numPartitions,
+ (x: (T, Long)) => x._2 != 0L
+ )
+ }
+
+ /**
* Return a new DStream by unifying data of another DStream with this DStream.
* @param that Another DStream having the same slideDuration as this DStream.
*/
@@ -572,16 +629,21 @@ abstract class DStream[T: ClassManifest] (
* Return all the RDDs between 'fromTime' to 'toTime' (both included)
*/
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
- val rdds = new ArrayBuffer[RDD[T]]()
- var time = toTime.floor(slideDuration)
- while (time >= zeroTime && time >= fromTime) {
- getOrCompute(time) match {
- case Some(rdd) => rdds += rdd
- case None => //throw new Exception("Could not get RDD for time " + time)
- }
- time -= slideDuration
+ if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
+ logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")")
+ }
+ if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
+ logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")")
}
- rdds.toSeq
+ val alignedToTime = toTime.floor(slideDuration)
+ val alignedFromTime = fromTime.floor(slideDuration)
+
+ logInfo("Slicing from " + fromTime + " to " + toTime +
+ " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")
+
+ alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
+ if (time >= zeroTime) getOrCompute(time) else None
+ })
}
/**
diff --git a/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala
index a375980b84..6b0fade7c6 100644
--- a/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala
@@ -87,7 +87,7 @@ class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T])
}
override def toString() = {
- "[\n" + checkpointFiles.size + "\n" + checkpointFiles.mkString("\n") + "\n]"
+ "[\n" + checkpointFiles.size + " checkpoint files \n" + checkpointFiles.mkString("\n") + "\n]"
}
}
diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
index d5a5496839..adb7f3a24d 100644
--- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
@@ -11,17 +11,20 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()
- private[streaming] var zeroTime: Time = null
- private[streaming] var batchDuration: Duration = null
- private[streaming] var rememberDuration: Duration = null
- private[streaming] var checkpointInProgress = false
+ var rememberDuration: Duration = null
+ var checkpointInProgress = false
- private[streaming] def start(time: Time) {
+ var zeroTime: Time = null
+ var startTime: Time = null
+ var batchDuration: Duration = null
+
+ def start(time: Time) {
this.synchronized {
if (zeroTime != null) {
throw new Exception("DStream graph computation already started")
}
zeroTime = time
+ startTime = time
outputStreams.foreach(_.initialize(zeroTime))
outputStreams.foreach(_.remember(rememberDuration))
outputStreams.foreach(_.validate)
@@ -29,19 +32,23 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
}
}
- private[streaming] def stop() {
+ def restart(time: Time) {
+ this.synchronized { startTime = time }
+ }
+
+ def stop() {
this.synchronized {
inputStreams.par.foreach(_.stop())
}
}
- private[streaming] def setContext(ssc: StreamingContext) {
+ def setContext(ssc: StreamingContext) {
this.synchronized {
outputStreams.foreach(_.setContext(ssc))
}
}
- private[streaming] def setBatchDuration(duration: Duration) {
+ def setBatchDuration(duration: Duration) {
this.synchronized {
if (batchDuration != null) {
throw new Exception("Batch duration already set as " + batchDuration +
@@ -51,59 +58,68 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
batchDuration = duration
}
- private[streaming] def remember(duration: Duration) {
+ def remember(duration: Duration) {
this.synchronized {
if (rememberDuration != null) {
throw new Exception("Batch duration already set as " + batchDuration +
". cannot set it again.")
}
+ rememberDuration = duration
}
- rememberDuration = duration
}
- private[streaming] def addInputStream(inputStream: InputDStream[_]) {
+ def addInputStream(inputStream: InputDStream[_]) {
this.synchronized {
inputStream.setGraph(this)
inputStreams += inputStream
}
}
- private[streaming] def addOutputStream(outputStream: DStream[_]) {
+ def addOutputStream(outputStream: DStream[_]) {
this.synchronized {
outputStream.setGraph(this)
outputStreams += outputStream
}
}
- private[streaming] def getInputStreams() = this.synchronized { inputStreams.toArray }
+ def getInputStreams() = this.synchronized { inputStreams.toArray }
- private[streaming] def getOutputStreams() = this.synchronized { outputStreams.toArray }
+ def getOutputStreams() = this.synchronized { outputStreams.toArray }
- private[streaming] def generateRDDs(time: Time): Seq[Job] = {
+ def generateJobs(time: Time): Seq[Job] = {
this.synchronized {
- outputStreams.flatMap(outputStream => outputStream.generateJob(time))
+ logInfo("Generating jobs for time " + time)
+ val jobs = outputStreams.flatMap(outputStream => outputStream.generateJob(time))
+ logInfo("Generated " + jobs.length + " jobs for time " + time)
+ jobs
}
}
- private[streaming] def forgetOldRDDs(time: Time) {
+ def clearOldMetadata(time: Time) {
this.synchronized {
- outputStreams.foreach(_.forgetOldMetadata(time))
+ logInfo("Clearing old metadata for time " + time)
+ outputStreams.foreach(_.clearOldMetadata(time))
+ logInfo("Cleared old metadata for time " + time)
}
}
- private[streaming] def updateCheckpointData(time: Time) {
+ def updateCheckpointData(time: Time) {
this.synchronized {
+ logInfo("Updating checkpoint data for time " + time)
outputStreams.foreach(_.updateCheckpointData(time))
+ logInfo("Updated checkpoint data for time " + time)
}
}
- private[streaming] def restoreCheckpointData() {
+ def restoreCheckpointData() {
this.synchronized {
+ logInfo("Restoring checkpoint data")
outputStreams.foreach(_.restoreCheckpointData())
+ logInfo("Restored checkpoint data")
}
}
- private[streaming] def validate() {
+ def validate() {
this.synchronized {
assert(batchDuration != null, "Batch duration has not been set")
//assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration + " is very low")
diff --git a/streaming/src/main/scala/spark/streaming/Duration.scala b/streaming/src/main/scala/spark/streaming/Duration.scala
index e4dc579a17..ee26206e24 100644
--- a/streaming/src/main/scala/spark/streaming/Duration.scala
+++ b/streaming/src/main/scala/spark/streaming/Duration.scala
@@ -16,7 +16,7 @@ case class Duration (private val millis: Long) {
def * (times: Int): Duration = new Duration(millis * times)
- def / (that: Duration): Long = millis / that.millis
+ def / (that: Duration): Double = millis.toDouble / that.millis.toDouble
def isMultipleOf(that: Duration): Boolean =
(this.millis % that.millis == 0)
diff --git a/streaming/src/main/scala/spark/streaming/Interval.scala b/streaming/src/main/scala/spark/streaming/Interval.scala
index dc21dfb722..6a8b81760e 100644
--- a/streaming/src/main/scala/spark/streaming/Interval.scala
+++ b/streaming/src/main/scala/spark/streaming/Interval.scala
@@ -30,6 +30,7 @@ class Interval(val beginTime: Time, val endTime: Time) {
override def toString = "[" + beginTime + ", " + endTime + "]"
}
+private[streaming]
object Interval {
def currentInterval(duration: Duration): Interval = {
val time = new Time(System.currentTimeMillis)
diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala
index 5acdd01e58..7696c4a592 100644
--- a/streaming/src/main/scala/spark/streaming/JobManager.scala
+++ b/streaming/src/main/scala/spark/streaming/JobManager.scala
@@ -15,8 +15,8 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging {
SparkEnv.set(ssc.env)
try {
val timeTaken = job.run()
- logInfo("Total delay: %.5f s for job %s (execution: %.5f s)".format(
- (System.currentTimeMillis() - job.time.milliseconds) / 1000.0, job.id, timeTaken / 1000.0))
+ logInfo("Total delay: %.5f s for job %s of time %s (execution: %.5f s)".format(
+ (System.currentTimeMillis() - job.time.milliseconds) / 1000.0, job.id, job.time.milliseconds, timeTaken / 1000.0))
} catch {
case e: Exception =>
logError("Running " + job + " failed", e)
@@ -38,19 +38,29 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging {
logInfo("Added " + job + " to queue")
}
+ def stop() {
+ jobExecutor.shutdown()
+ }
+
private def clearJob(job: Job) {
+ var timeCleared = false
+ val time = job.time
jobs.synchronized {
- val jobsOfTime = jobs.get(job.time)
+ val jobsOfTime = jobs.get(time)
if (jobsOfTime.isDefined) {
jobsOfTime.get -= job
if (jobsOfTime.get.isEmpty) {
- jobs -= job.time
+ jobs -= time
+ timeCleared = true
}
} else {
throw new Exception("Job finished for time " + job.time +
" but time does not exist in jobs")
}
}
+ if (timeCleared) {
+ ssc.scheduler.clearOldMetadata(time)
+ }
}
def getPendingTimes(): Array[Time] = {
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
index e4152f3a61..64972fd5cd 100644
--- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
@@ -4,6 +4,7 @@ import spark.streaming.dstream.{NetworkInputDStream, NetworkReceiver}
import spark.streaming.dstream.{StopReceiver, ReportBlock, ReportError}
import spark.Logging
import spark.SparkEnv
+import spark.SparkContext._
import scala.collection.mutable.HashMap
import scala.collection.mutable.Queue
@@ -23,7 +24,7 @@ private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) ext
*/
private[streaming]
class NetworkInputTracker(
- @transient ssc: StreamingContext,
+ @transient ssc: StreamingContext,
@transient networkInputStreams: Array[NetworkInputDStream[_]])
extends Logging {
@@ -65,12 +66,12 @@ class NetworkInputTracker(
def receive = {
case RegisterReceiver(streamId, receiverActor) => {
if (!networkInputStreamMap.contains(streamId)) {
- throw new Exception("Register received for unexpected id " + streamId)
+ throw new Exception("Register received for unexpected id " + streamId)
}
receiverInfo += ((streamId, receiverActor))
logInfo("Registered receiver for network stream " + streamId + " from " + sender.path.address)
sender ! true
- }
+ }
case AddBlocks(streamId, blockIds, metadata) => {
val tmp = receivedBlockIds.synchronized {
if (!receivedBlockIds.contains(streamId)) {
@@ -85,7 +86,7 @@ class NetworkInputTracker(
}
case DeregisterReceiver(streamId, msg) => {
receiverInfo -= streamId
- logInfo("De-registered receiver for network stream " + streamId
+ logError("De-registered receiver for network stream " + streamId
+ " with message " + msg)
//TODO: Do something about the corresponding NetworkInputDStream
}
@@ -95,8 +96,8 @@ class NetworkInputTracker(
/** This thread class runs all the receivers on the cluster. */
class ReceiverExecutor extends Thread {
val env = ssc.env
-
- override def run() {
+
+ override def run() {
try {
SparkEnv.set(env)
startReceivers()
@@ -113,7 +114,7 @@ class NetworkInputTracker(
*/
def startReceivers() {
val receivers = networkInputStreams.map(nis => {
- val rcvr = nis.createReceiver()
+ val rcvr = nis.getReceiver()
rcvr.setStreamId(nis.id)
rcvr
})
@@ -138,10 +139,14 @@ class NetworkInputTracker(
}
iterator.next().start()
}
+ // Run the dummy Spark job to ensure that all slaves have registered.
+ // This avoids all the receivers to be scheduled on the same node.
+ //ssc.sparkContext.makeRDD(1 to 100, 100).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
+
// Distribute the receivers and start them
- ssc.sc.runJob(tempRDD, startReceiver)
+ ssc.sparkContext.runJob(tempRDD, startReceiver)
}
-
+
/** Stops the receivers. */
def stopReceivers() {
// Signal the receivers to stop
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index fbcf061126..5a2dd46fa0 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -18,15 +18,15 @@ import org.apache.hadoop.conf.Configuration
class PairDStreamFunctions[K: ClassManifest, V: ClassManifest](self: DStream[(K,V)])
extends Serializable {
-
- def ssc = self.ssc
+
+ private[streaming] def ssc = self.ssc
private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = {
new HashPartitioner(numPartitions)
}
/**
- * Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
+ * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
* generate the RDDs with Spark's default number of partitions.
*/
def groupByKey(): DStream[(K, Seq[V])] = {
@@ -34,7 +34,7 @@ extends Serializable {
}
/**
- * Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
+ * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
* generate the RDDs with `numPartitions` partitions.
*/
def groupByKey(numPartitions: Int): DStream[(K, Seq[V])] = {
@@ -42,7 +42,7 @@ extends Serializable {
}
/**
- * Create a new DStream by applying `groupByKey` on each RDD. The supplied [[spark.Partitioner]]
+ * Return a new DStream by applying `groupByKey` on each RDD. The supplied [[spark.Partitioner]]
* is used to control the partitioning of each RDD.
*/
def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = {
@@ -54,7 +54,7 @@ extends Serializable {
}
/**
- * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+ * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
* merged using the associative reduce function. Hash partitioning is used to generate the RDDs
* with Spark's default number of partitions.
*/
@@ -63,7 +63,7 @@ extends Serializable {
}
/**
- * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+ * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
* merged using the supplied reduce function. Hash partitioning is used to generate the RDDs
* with `numPartitions` partitions.
*/
@@ -72,7 +72,7 @@ extends Serializable {
}
/**
- * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+ * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
* merged using the supplied reduce function. [[spark.Partitioner]] is used to control the
* partitioning of each RDD.
*/
@@ -82,7 +82,7 @@ extends Serializable {
}
/**
- * Combine elements of each key in DStream's RDDs using custom function. This is similar to the
+ * Combine elements of each key in DStream's RDDs using custom functions. This is similar to the
* combineByKey for RDDs. Please refer to combineByKey in [[spark.PairRDDFunctions]] for more
* information.
*/
@@ -95,15 +95,7 @@ extends Serializable {
}
/**
- * Create a new DStream by counting the number of values of each key in each RDD. Hash
- * partitioning is used to generate the RDDs with Spark's `numPartitions` partitions.
- */
- def countByKey(numPartitions: Int = self.ssc.sc.defaultParallelism): DStream[(K, Long)] = {
- self.map(x => (x._1, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
- }
-
- /**
- * Creates a new DStream by applying `groupByKey` over a sliding window. This is similar to
+ * Return a new DStream by applying `groupByKey` over a sliding window. This is similar to
* `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs
* with the same interval as this DStream. Hash partitioning is used to generate the RDDs with
* Spark's default number of partitions.
@@ -115,7 +107,7 @@ extends Serializable {
}
/**
- * Create a new DStream by applying `groupByKey` over a sliding window. Similar to
+ * Return a new DStream by applying `groupByKey` over a sliding window. Similar to
* `DStream.groupByKey()`, but applies it over a sliding window. Hash partitioning is used to
* generate the RDDs with Spark's default number of partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
@@ -129,7 +121,7 @@ extends Serializable {
}
/**
- * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+ * Return a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
* Similar to `DStream.groupByKey()`, but applies it over a sliding window.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
@@ -137,7 +129,8 @@ extends Serializable {
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param numPartitions Number of partitions of each RDD in the new DStream.
+ * @param numPartitions number of partitions of each RDD in the new DStream; if not specified
+ * then Spark's default number of partitions will be used
*/
def groupByKeyAndWindow(
windowDuration: Duration,
@@ -155,7 +148,7 @@ extends Serializable {
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream.
*/
def groupByKeyAndWindow(
windowDuration: Duration,
@@ -166,7 +159,7 @@ extends Serializable {
}
/**
- * Create a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
+ * Return a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
* Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream
* generates RDDs with the same interval as this DStream. Hash partitioning is used to generate
* the RDDs with Spark's default number of partitions.
@@ -182,7 +175,7 @@ extends Serializable {
}
/**
- * Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to
+ * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to
* `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
* generate the RDDs with Spark's default number of partitions.
* @param reduceFunc associative reduce function
@@ -201,7 +194,7 @@ extends Serializable {
}
/**
- * Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to
+ * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to
* `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
* generate the RDDs with `numPartitions` partitions.
* @param reduceFunc associative reduce function
@@ -210,10 +203,10 @@ extends Serializable {
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param numPartitions Number of partitions of each RDD in the new DStream.
+ * @param numPartitions number of partitions of each RDD in the new DStream.
*/
def reduceByKeyAndWindow(
- reduceFunc: (V, V) => V,
+ reduceFunc: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int
@@ -222,7 +215,7 @@ extends Serializable {
}
/**
- * Create a new DStream by applying `reduceByKey` over a sliding window. Similar to
+ * Return a new DStream by applying `reduceByKey` over a sliding window. Similar to
* `DStream.reduceByKey()`, but applies it over a sliding window.
* @param reduceFunc associative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
@@ -230,7 +223,8 @@ extends Serializable {
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param partitioner partitioner for controlling the partitioning of each RDD
+ * in the new DStream.
*/
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
@@ -245,118 +239,78 @@ extends Serializable {
}
/**
- * Create a new DStream by reducing over a using incremental computation.
- * The reduced value of over a new window is calculated using the old window's reduce value :
+ * Return a new DStream by applying incremental `reduceByKey` over a sliding window.
+ * The reduced value of over a new window is calculated using the old window's reduced value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
+ *
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
- * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
+ *
+ * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function.
* However, it is applicable to only "invertible reduce functions".
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param reduceFunc associative reduce function
- * @param invReduceFunc inverse function
+ * @param invReduceFunc inverse reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
+ * @param filterFunc Optional function to filter expired key-value pairs;
+ * only pairs that satisfy the function are retained
*/
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
windowDuration: Duration,
- slideDuration: Duration
+ slideDuration: Duration = self.slideDuration,
+ numPartitions: Int = ssc.sc.defaultParallelism,
+ filterFunc: ((K, V)) => Boolean = null
): DStream[(K, V)] = {
reduceByKeyAndWindow(
- reduceFunc, invReduceFunc, windowDuration, slideDuration, defaultPartitioner())
- }
-
- /**
- * Create a new DStream by reducing over a using incremental computation.
- * The reduced value of over a new window is calculated using the old window's reduce value :
- * 1. reduce the new values that entered the window (e.g., adding new counts)
- * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
- * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
- * However, it is applicable to only "invertible reduce functions".
- * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
- * @param reduceFunc associative reduce function
- * @param invReduceFunc inverse function
- * @param windowDuration width of the window; must be a multiple of this DStream's
- * batching interval
- * @param slideDuration sliding interval of the window (i.e., the interval after which
- * the new DStream will generate RDDs); must be a multiple of this
- * DStream's batching interval
- * @param numPartitions Number of partitions of each RDD in the new DStream.
- */
- def reduceByKeyAndWindow(
- reduceFunc: (V, V) => V,
- invReduceFunc: (V, V) => V,
- windowDuration: Duration,
- slideDuration: Duration,
- numPartitions: Int
- ): DStream[(K, V)] = {
-
- reduceByKeyAndWindow(
- reduceFunc, invReduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions))
+ reduceFunc, invReduceFunc, windowDuration,
+ slideDuration, defaultPartitioner(numPartitions), filterFunc
+ )
}
/**
- * Create a new DStream by reducing over a using incremental computation.
- * The reduced value of over a new window is calculated using the old window's reduce value :
+ * Return a new DStream by applying incremental `reduceByKey` over a sliding window.
+ * The reduced value of over a new window is calculated using the old window's reduced value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
- * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
+ * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function.
* However, it is applicable to only "invertible reduce functions".
- * @param reduceFunc associative reduce function
- * @param invReduceFunc inverse function
+ * @param reduceFunc associative reduce function
+ * @param invReduceFunc inverse reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param filterFunc Optional function to filter expired key-value pairs;
+ * only pairs that satisfy the function are retained
*/
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration,
- partitioner: Partitioner
+ partitioner: Partitioner,
+ filterFunc: ((K, V)) => Boolean
): DStream[(K, V)] = {
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)
+ val cleanedFilterFunc = if (filterFunc != null) Some(ssc.sc.clean(filterFunc)) else None
new ReducedWindowedDStream[K, V](
- self, cleanedReduceFunc, cleanedInvReduceFunc, windowDuration, slideDuration, partitioner)
- }
-
- /**
- * Create a new DStream by counting the number of values for each key over a window.
- * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
- * @param windowDuration width of the window; must be a multiple of this DStream's
- * batching interval
- * @param slideDuration sliding interval of the window (i.e., the interval after which
- * the new DStream will generate RDDs); must be a multiple of this
- * DStream's batching interval
- * @param numPartitions Number of partitions of each RDD in the new DStream.
- */
- def countByKeyAndWindow(
- windowDuration: Duration,
- slideDuration: Duration,
- numPartitions: Int = self.ssc.sc.defaultParallelism
- ): DStream[(K, Long)] = {
-
- self.map(x => (x._1, 1L)).reduceByKeyAndWindow(
- (x: Long, y: Long) => x + y,
- (x: Long, y: Long) => x - y,
- windowDuration,
- slideDuration,
- numPartitions
+ self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc,
+ windowDuration, slideDuration, partitioner
)
}
/**
- * Create a new "state" DStream where the state for each key is updated by applying
+ * Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param updateFunc State update function. If `this` function returns None, then
@@ -370,7 +324,7 @@ extends Serializable {
}
/**
- * Create a new "state" DStream where the state for each key is updated by applying
+ * Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param updateFunc State update function. If `this` function returns None, then
@@ -405,7 +359,7 @@ extends Serializable {
}
/**
- * Create a new "state" DStream where the state for each key is updated by applying
+ * Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* [[spark.Paxrtitioner]] is used to control the partitioning of each RDD.
* @param updateFunc State update function. If `this` function returns None, then
@@ -447,7 +401,7 @@ extends Serializable {
}
/**
- * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this`
+ * Cogroup `this` DStream with `other` DStream using a partitioner. For each key k in corresponding RDDs of `this`
* or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
* key in both RDDs. Partitioner is used to partition each generated RDD.
*/
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index b77986a3ba..1c4b22a898 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -9,11 +9,8 @@ class Scheduler(ssc: StreamingContext) extends Logging {
initLogging()
- val graph = ssc.graph
-
val concurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt
val jobManager = new JobManager(ssc, concurrentJobs)
-
val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
new CheckpointWriter(ssc.checkpointDir)
} else {
@@ -23,57 +20,93 @@ class Scheduler(ssc: StreamingContext) extends Logging {
val clockClass = System.getProperty("spark.streaming.clock", "spark.streaming.util.SystemClock")
val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
- longTime => generateRDDs(new Time(longTime)))
+ longTime => generateJobs(new Time(longTime)))
+ val graph = ssc.graph
+ var latestTime: Time = null
- def start() {
- // If context was started from checkpoint, then restart timer such that
- // this timer's triggers occur at the same time as the original timer.
- // Otherwise just start the timer from scratch, and initialize graph based
- // on this first trigger time of the timer.
+ def start() = synchronized {
if (ssc.isCheckpointPresent) {
- // If manual clock is being used for testing, then
- // either set the manual clock to the last checkpointed time,
- // or if the property is defined set it to that time
- if (clock.isInstanceOf[ManualClock]) {
- val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
- val jumpTime = System.getProperty("spark.streaming.manualClock.jump", "0").toLong
- clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
- }
- // Reschedule the batches that were received but not processed before failure
- ssc.initialCheckpoint.pendingTimes.foreach(time => generateRDDs(time))
- // Restart the timer
- timer.restart(graph.zeroTime.milliseconds)
- logInfo("Scheduler's timer restarted")
+ restart()
} else {
- val firstTime = new Time(timer.start())
- graph.start(firstTime - ssc.graph.batchDuration)
- logInfo("Scheduler's timer started")
+ startFirstTime()
}
logInfo("Scheduler started")
}
- def stop() {
+ def stop() = synchronized {
timer.stop()
- graph.stop()
+ jobManager.stop()
+ if (checkpointWriter != null) checkpointWriter.stop()
+ ssc.graph.stop()
logInfo("Scheduler stopped")
}
-
- private def generateRDDs(time: Time) {
+
+ private def startFirstTime() {
+ val startTime = new Time(timer.getStartTime())
+ graph.start(startTime - graph.batchDuration)
+ timer.start(startTime.milliseconds)
+ logInfo("Scheduler's timer started at " + startTime)
+ }
+
+ private def restart() {
+
+ // If manual clock is being used for testing, then
+ // either set the manual clock to the last checkpointed time,
+ // or if the property is defined set it to that time
+ if (clock.isInstanceOf[ManualClock]) {
+ val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
+ val jumpTime = System.getProperty("spark.streaming.manualClock.jump", "0").toLong
+ clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
+ }
+
+ val batchDuration = ssc.graph.batchDuration
+
+ // Batches when the master was down, that is,
+ // between the checkpoint and current restart time
+ val checkpointTime = ssc.initialCheckpoint.checkpointTime
+ val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds))
+ val downTimes = checkpointTime.until(restartTime, batchDuration)
+ logInfo("Batches during down time: " + downTimes.mkString(", "))
+
+ // Batches that were unprocessed before failure
+ val pendingTimes = ssc.initialCheckpoint.pendingTimes
+ logInfo("Batches pending processing: " + pendingTimes.mkString(", "))
+ // Reschedule jobs for these times
+ val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering)
+ logInfo("Batches to reschedule: " + timesToReschedule.mkString(", "))
+ timesToReschedule.foreach(time =>
+ graph.generateJobs(time).foreach(jobManager.runJob)
+ )
+
+ // Restart the timer
+ timer.start(restartTime.milliseconds)
+ logInfo("Scheduler's timer restarted at " + restartTime)
+ }
+
+ /** Generate jobs and perform checkpoint for the given `time`. */
+ def generateJobs(time: Time) {
SparkEnv.set(ssc.env)
logInfo("\n-----------------------------------------------------\n")
- graph.generateRDDs(time).foreach(jobManager.runJob)
- graph.forgetOldRDDs(time)
+ graph.generateJobs(time).foreach(jobManager.runJob)
+ latestTime = time
+ doCheckpoint(time)
+ }
+
+ /**
+ * Clear old metadata assuming jobs of `time` have finished processing.
+ * And also perform checkpoint.
+ */
+ def clearOldMetadata(time: Time) {
+ ssc.graph.clearOldMetadata(time)
doCheckpoint(time)
- logInfo("Generated RDDs for time " + time)
}
- private def doCheckpoint(time: Time) {
+ /** Perform checkpoint for the give `time`. */
+ def doCheckpoint(time: Time) = synchronized {
if (ssc.checkpointDuration != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
- val startTime = System.currentTimeMillis()
+ logInfo("Checkpointing graph for time " + time)
ssc.graph.updateCheckpointData(time)
checkpointWriter.write(new Checkpoint(ssc, time))
- val stopTime = System.currentTimeMillis()
- logInfo("Checkpointing the graph took " + (stopTime - startTime) + " ms")
}
}
}
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 8cfbec51d2..a9684c5772 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -1,10 +1,17 @@
package spark.streaming
+import akka.actor.Props
+import akka.actor.SupervisorStrategy
+
import spark.streaming.dstream._
import spark.{RDD, Logging, SparkEnv, SparkContext}
+import spark.streaming.receivers.ActorReceiver
+import spark.streaming.receivers.ReceiverSupervisorStrategy
import spark.storage.StorageLevel
import spark.util.MetadataCleaner
+import spark.streaming.receivers.ActorReceiver
+
import scala.collection.mutable.Queue
@@ -17,6 +24,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
import java.util.UUID
+import twitter4j.Status
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -30,14 +38,14 @@ class StreamingContext private (
) extends Logging {
/**
- * Creates a StreamingContext using an existing SparkContext.
+ * Create a StreamingContext using an existing SparkContext.
* @param sparkContext Existing SparkContext
* @param batchDuration The time interval at which streaming data will be divided into batches
*/
def this(sparkContext: SparkContext, batchDuration: Duration) = this(sparkContext, null, batchDuration)
/**
- * Creates a StreamingContext by providing the details necessary for creating a new SparkContext.
+ * Create a StreamingContext by providing the details necessary for creating a new SparkContext.
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param frameworkName A name for your job, to display on the cluster web UI
* @param batchDuration The time interval at which streaming data will be divided into batches
@@ -46,7 +54,7 @@ class StreamingContext private (
this(StreamingContext.createNewSparkContext(master, frameworkName), null, batchDuration)
/**
- * Re-creates a StreamingContext from a checkpoint file.
+ * Re-create a StreamingContext from a checkpoint file.
* @param path Path either to the directory that was specified as the checkpoint directory, or
* to the checkpoint file 'graph' or 'graph.bk'.
*/
@@ -101,12 +109,12 @@ class StreamingContext private (
protected[streaming] var scheduler: Scheduler = null
/**
- * Returns the associated Spark context
+ * Return the associated Spark context
*/
def sparkContext = sc
/**
- * Sets each DStreams in this context to remember RDDs it generated in the last given duration.
+ * Set each DStreams in this context to remember RDDs it generated in the last given duration.
* DStreams remember RDDs only for a limited duration of time and releases them for garbage
* collection. This method allows the developer to specify how to long to remember the RDDs (
* if the developer wishes to query old data outside the DStream computation).
@@ -117,19 +125,16 @@ class StreamingContext private (
}
/**
- * Sets the context to periodically checkpoint the DStream operations for master
- * fault-tolerance. By default, the graph will be checkpointed every batch interval.
+ * Set the context to periodically checkpoint the DStream operations for master
+ * fault-tolerance. The graph will be checkpointed every batch interval.
* @param directory HDFS-compatible directory where the checkpoint data will be reliably stored
- * @param interval checkpoint interval
*/
- def checkpoint(directory: String, interval: Duration = null) {
+ def checkpoint(directory: String) {
if (directory != null) {
sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(directory))
checkpointDir = directory
- checkpointDuration = interval
} else {
checkpointDir = null
- checkpointDuration = null
}
}
@@ -140,6 +145,36 @@ class StreamingContext private (
protected[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement()
/**
+ * Create an input stream with any arbitrary user implemented network receiver.
+ * @param receiver Custom implementation of NetworkReceiver
+ */
+ def networkStream[T: ClassManifest](
+ receiver: NetworkReceiver[T]): DStream[T] = {
+ val inputStream = new PluggableInputDStream[T](this,
+ receiver)
+ graph.addInputStream(inputStream)
+ inputStream
+ }
+
+ /**
+ * Create an input stream with any arbitrary user implemented actor receiver.
+ * @param props Props object defining creation of the actor
+ * @param name Name of the actor
+ * @param storageLevel RDD storage level. Defaults to memory-only.
+ *
+ * @note An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e parametrized type of data received and actorStream
+ * should be same.
+ */
+ def actorStream[T: ClassManifest](
+ props: Props, name: String,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
+ supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = {
+ networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))
+ }
+
+ /**
* Create an input stream that pulls messages form a Kafka Broker.
* @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
@@ -147,7 +182,8 @@ class StreamingContext private (
* in its own thread.
* @param initialOffsets Optional initial offsets for each of the partitions to consume.
* By default the value is pulled from zookeper.
- * @param storageLevel RDD storage level. Defaults to memory-only.
+ * @param storageLevel Storage level to use for storing the received objects
+ * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
def kafkaStream[T: ClassManifest](
zkQuorum: String,
@@ -162,24 +198,24 @@ class StreamingContext private (
}
/**
- * Create a input stream from network source hostname:port. Data is received using
- * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
+ * Create a input stream from TCP source hostname:port. Data is received using
+ * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
* lines.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
- def networkTextStream(
+ def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[String] = {
- networkStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
+ socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
/**
- * Create a input stream from network source hostname:port. Data is received using
+ * Create a input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes it interepreted as object using the given
* converter.
* @param hostname Hostname to connect to for receiving data
@@ -188,7 +224,7 @@ class StreamingContext private (
* @param storageLevel Storage level to use for storing the received objects
* @tparam T Type of the objects received (after converting bytes to objects)
*/
- def networkStream[T: ClassManifest](
+ def socketStream[T: ClassManifest](
hostname: String,
port: Int,
converter: (InputStream) => Iterator[T],
@@ -200,7 +236,7 @@ class StreamingContext private (
}
/**
- * Creates a input stream from a Flume source.
+ * Create a input stream from a Flume source.
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
* @param storageLevel Storage level to use for storing the received objects
@@ -236,7 +272,7 @@ class StreamingContext private (
}
/**
- * Creates a input stream that monitors a Hadoop-compatible filesystem
+ * Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
* File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
@@ -255,7 +291,7 @@ class StreamingContext private (
}
/**
- * Creates a input stream that monitors a Hadoop-compatible filesystem
+ * Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
* @param directory HDFS directory to monitor for new file
* @param filter Function to filter paths to process
@@ -274,9 +310,8 @@ class StreamingContext private (
inputStream
}
-
/**
- * Creates a input stream that monitors a Hadoop-compatible filesystem
+ * Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them as text files (using key as LongWritable, value
* as Text and input format as TextInputFormat). File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
@@ -286,7 +321,25 @@ class StreamingContext private (
}
/**
- * Creates an input stream from a queue of RDDs. In each batch,
+ * Create a input stream that returns tweets received from Twitter.
+ * @param username Twitter username
+ * @param password Twitter password
+ * @param filters Set of filter strings to get only those tweets that match them
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def twitterStream(
+ username: String,
+ password: String,
+ filters: Seq[String],
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): DStream[Status] = {
+ val inputStream = new TwitterInputDStream(this, username, password, filters, storageLevel)
+ registerInputStream(inputStream)
+ inputStream
+ }
+
+ /**
+ * Create an input stream from a queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
* @param queue Queue of RDDs
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
@@ -300,7 +353,7 @@ class StreamingContext private (
}
/**
- * Creates an input stream from a queue of RDDs. In each batch,
+ * Create an input stream from a queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
* @param queue Queue of RDDs
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
@@ -325,7 +378,7 @@ class StreamingContext private (
}
/**
- * Registers an input stream that will be started (InputDStream.start() called) to get the
+ * Register an input stream that will be started (InputDStream.start() called) to get the
* input data.
*/
def registerInputStream(inputStream: InputDStream[_]) {
@@ -333,7 +386,7 @@ class StreamingContext private (
}
/**
- * Registers an output stream that will be computed every interval
+ * Register an output stream that will be computed every interval
*/
def registerOutputStream(outputStream: DStream[_]) {
graph.addOutputStream(outputStream)
@@ -351,7 +404,7 @@ class StreamingContext private (
}
/**
- * Starts the execution of the streams.
+ * Start the execution of the streams.
*/
def start() {
if (checkpointDir != null && checkpointDuration == null && graph != null) {
@@ -379,7 +432,7 @@ class StreamingContext private (
}
/**
- * Stops the execution of the streams.
+ * Stop the execution of the streams.
*/
def stop() {
try {
diff --git a/streaming/src/main/scala/spark/streaming/Time.scala b/streaming/src/main/scala/spark/streaming/Time.scala
index 5daeb761dd..f14decf08b 100644
--- a/streaming/src/main/scala/spark/streaming/Time.scala
+++ b/streaming/src/main/scala/spark/streaming/Time.scala
@@ -37,6 +37,19 @@ case class Time(private val millis: Long) {
def max(that: Time): Time = if (this > that) this else that
+ def until(that: Time, interval: Duration): Seq[Time] = {
+ (this.milliseconds) until (that.milliseconds) by (interval.milliseconds) map (new Time(_))
+ }
+
+ def to(that: Time, interval: Duration): Seq[Time] = {
+ (this.milliseconds) to (that.milliseconds) by (interval.milliseconds) map (new Time(_))
+ }
+
+
override def toString: String = (millis.toString + " ms")
+}
+
+object Time {
+ val ordering = Ordering.by((time: Time) => time.millis)
} \ No newline at end of file
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
index 2e7466b16c..30985b4ebc 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
@@ -36,7 +36,7 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM
def cache(): JavaDStream[T] = dstream.cache()
/** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
- def persist(): JavaDStream[T] = dstream.cache()
+ def persist(): JavaDStream[T] = dstream.persist()
/** Persist the RDDs of this DStream with the given storage level */
def persist(storageLevel: StorageLevel): JavaDStream[T] = dstream.persist(storageLevel)
@@ -50,34 +50,27 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM
}
/**
- * Return a new DStream which is computed based on windowed batches of this DStream.
- * The new DStream generates RDDs with the same interval as this DStream.
+ * Return a new DStream in which each RDD contains all the elements in seen in a
+ * sliding window of time over this DStream. The new DStream generates RDDs with
+ * the same interval as this DStream.
* @param windowDuration width of the window; must be a multiple of this DStream's interval.
- * @return
*/
def window(windowDuration: Duration): JavaDStream[T] =
dstream.window(windowDuration)
/**
- * Return a new DStream which is computed based on windowed batches of this DStream.
- * @param windowDuration duration (i.e., width) of the window;
- * must be a multiple of this DStream's interval
+ * Return a new DStream in which each RDD contains all the elements in seen in a
+ * sliding window of time over this DStream.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
- * the new DStream will generate RDDs); must be a multiple of this
- * DStream's interval
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
*/
def window(windowDuration: Duration, slideDuration: Duration): JavaDStream[T] =
dstream.window(windowDuration, slideDuration)
/**
- * Return a new DStream which computed based on tumbling window on this DStream.
- * This is equivalent to window(batchDuration, batchDuration).
- * @param batchDuration tumbling window duration; must be a multiple of this DStream's interval
- */
- def tumble(batchDuration: Duration): JavaDStream[T] =
- dstream.tumble(batchDuration)
-
- /**
* Return a new DStream by unifying data of another DStream with this DStream.
* @param that Another DStream having the same interval (i.e., slideDuration) as this DStream.
*/
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
index b93cb7865a..1c1ba05ff9 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
@@ -34,6 +34,26 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
def count(): JavaDStream[JLong] = dstream.count()
/**
+ * Return a new DStream in which each RDD contains the counts of each distinct value in
+ * each RDD of this DStream. Hash partitioning is used to generate the RDDs with
+ * Spark's default number of partitions.
+ */
+ def countByValue(): JavaPairDStream[T, JLong] = {
+ JavaPairDStream.scalaToJavaLong(dstream.countByValue())
+ }
+
+ /**
+ * Return a new DStream in which each RDD contains the counts of each distinct value in
+ * each RDD of this DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
+ * partitions.
+ * @param numPartitions number of partitions of each RDD in the new DStream.
+ */
+ def countByValue(numPartitions: Int): JavaPairDStream[T, JLong] = {
+ JavaPairDStream.scalaToJavaLong(dstream.countByValue(numPartitions))
+ }
+
+
+ /**
* Return a new DStream in which each RDD has a single element generated by counting the number
* of elements in a window over this DStream. windowDuration and slideDuration are as defined in the
* window() operation. This is equivalent to window(windowDuration, slideDuration).count()
@@ -43,6 +63,39 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
}
/**
+ * Return a new DStream in which each RDD contains the count of distinct elements in
+ * RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with
+ * Spark's default number of partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
+ def countByValueAndWindow(windowDuration: Duration, slideDuration: Duration)
+ : JavaPairDStream[T, JLong] = {
+ JavaPairDStream.scalaToJavaLong(
+ dstream.countByValueAndWindow(windowDuration, slideDuration))
+ }
+
+ /**
+ * Return a new DStream in which each RDD contains the count of distinct elements in
+ * RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
+ * partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param numPartitions number of partitions of each RDD in the new DStream.
+ */
+ def countByValueAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int)
+ : JavaPairDStream[T, JLong] = {
+ JavaPairDStream.scalaToJavaLong(
+ dstream.countByValueAndWindow(windowDuration, slideDuration, numPartitions))
+ }
+
+ /**
* Return a new DStream in which each RDD is generated by applying glom() to each RDD of
* this DStream. Applying glom() to an RDD coalesces all elements within each partition into
* an array.
@@ -114,8 +167,38 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
/**
* Return a new DStream in which each RDD has a single element generated by reducing all
- * elements in a window over this DStream. windowDuration and slideDuration are as defined in the
- * window() operation. This is equivalent to window(windowDuration, slideDuration).reduce(reduceFunc)
+ * elements in a sliding window over this DStream.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
+ def reduceByWindow(
+ reduceFunc: (T, T) => T,
+ windowDuration: Duration,
+ slideDuration: Duration
+ ): DStream[T] = {
+ dstream.reduceByWindow(reduceFunc, windowDuration, slideDuration)
+ }
+
+
+ /**
+ * Return a new DStream in which each RDD has a single element generated by reducing all
+ * elements in a sliding window over this DStream. However, the reduction is done incrementally
+ * using the old window's reduced value :
+ * 1. reduce the new values that entered the window (e.g., adding new counts)
+ * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
+ * This is more efficient than reduceByWindow without "inverse reduce" function.
+ * However, it is applicable to only "invertible reduce functions".
+ * @param reduceFunc associative reduce function
+ * @param invReduceFunc inverse reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
*/
def reduceByWindow(
reduceFunc: JFunction2[T, T, T],
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
index ef10c091ca..952ca657bf 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
@@ -25,17 +25,17 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
// Methods common to all DStream's
// =======================================================================
- /** Returns a new DStream containing only the elements that satisfy a predicate. */
+ /** Return a new DStream containing only the elements that satisfy a predicate. */
def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairDStream[K, V] =
dstream.filter((x => f(x).booleanValue()))
- /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
+ /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def cache(): JavaPairDStream[K, V] = dstream.cache()
- /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
- def persist(): JavaPairDStream[K, V] = dstream.cache()
+ /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
+ def persist(): JavaPairDStream[K, V] = dstream.persist()
- /** Persists the RDDs of this DStream with the given storage level */
+ /** Persist the RDDs of this DStream with the given storage level */
def persist(storageLevel: StorageLevel): JavaPairDStream[K, V] = dstream.persist(storageLevel)
/** Method that generates a RDD for the given Duration */
@@ -67,15 +67,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.window(windowDuration, slideDuration)
/**
- * Returns a new DStream which computed based on tumbling window on this DStream.
- * This is equivalent to window(batchDuration, batchDuration).
- * @param batchDuration tumbling window duration; must be a multiple of this DStream's interval
- */
- def tumble(batchDuration: Duration): JavaPairDStream[K, V] =
- dstream.tumble(batchDuration)
-
- /**
- * Returns a new DStream by unifying data of another DStream with this DStream.
+ * Return a new DStream by unifying data of another DStream with this DStream.
* @param that Another DStream having the same interval (i.e., slideDuration) as this DStream.
*/
def union(that: JavaPairDStream[K, V]): JavaPairDStream[K, V] =
@@ -86,21 +78,21 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
// =======================================================================
/**
- * Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
+ * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
* generate the RDDs with Spark's default number of partitions.
*/
def groupByKey(): JavaPairDStream[K, JList[V]] =
dstream.groupByKey().mapValues(seqAsJavaList _)
/**
- * Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
+ * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
* generate the RDDs with `numPartitions` partitions.
*/
def groupByKey(numPartitions: Int): JavaPairDStream[K, JList[V]] =
dstream.groupByKey(numPartitions).mapValues(seqAsJavaList _)
/**
- * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream.
+ * Return a new DStream by applying `groupByKey` on each RDD of `this` DStream.
* Therefore, the values for each key in `this` DStream's RDDs are grouped into a
* single sequence to generate the RDDs of the new DStream. [[spark.Partitioner]]
* is used to control the partitioning of each RDD.
@@ -109,7 +101,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.groupByKey(partitioner).mapValues(seqAsJavaList _)
/**
- * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+ * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
* merged using the associative reduce function. Hash partitioning is used to generate the RDDs
* with Spark's default number of partitions.
*/
@@ -117,7 +109,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.reduceByKey(func)
/**
- * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+ * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
* merged using the supplied reduce function. Hash partitioning is used to generate the RDDs
* with `numPartitions` partitions.
*/
@@ -125,7 +117,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.reduceByKey(func, numPartitions)
/**
- * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+ * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
* merged using the supplied reduce function. [[spark.Partitioner]] is used to control the
* partitioning of each RDD.
*/
@@ -149,24 +141,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Create a new DStream by counting the number of values of each key in each RDD. Hash
- * partitioning is used to generate the RDDs with Spark's `numPartitions` partitions.
- */
- def countByKey(numPartitions: Int): JavaPairDStream[K, JLong] = {
- JavaPairDStream.scalaToJavaLong(dstream.countByKey(numPartitions));
- }
-
-
- /**
- * Create a new DStream by counting the number of values of each key in each RDD. Hash
- * partitioning is used to generate the RDDs with the default number of partitions.
- */
- def countByKey(): JavaPairDStream[K, JLong] = {
- JavaPairDStream.scalaToJavaLong(dstream.countByKey());
- }
-
- /**
- * Creates a new DStream by applying `groupByKey` over a sliding window. This is similar to
+ * Return a new DStream by applying `groupByKey` over a sliding window. This is similar to
* `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs
* with the same interval as this DStream. Hash partitioning is used to generate the RDDs with
* Spark's default number of partitions.
@@ -178,7 +153,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Create a new DStream by applying `groupByKey` over a sliding window. Similar to
+ * Return a new DStream by applying `groupByKey` over a sliding window. Similar to
* `DStream.groupByKey()`, but applies it over a sliding window. Hash partitioning is used to
* generate the RDDs with Spark's default number of partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
@@ -193,7 +168,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+ * Return a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
* Similar to `DStream.groupByKey()`, but applies it over a sliding window.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
@@ -210,7 +185,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+ * Return a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
* Similar to `DStream.groupByKey()`, but applies it over a sliding window.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
@@ -243,7 +218,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to
+ * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to
* `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
* generate the RDDs with Spark's default number of partitions.
* @param reduceFunc associative reduce function
@@ -262,7 +237,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to
+ * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to
* `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
* generate the RDDs with `numPartitions` partitions.
* @param reduceFunc associative reduce function
@@ -283,7 +258,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Create a new DStream by applying `reduceByKey` over a sliding window. Similar to
+ * Return a new DStream by applying `reduceByKey` over a sliding window. Similar to
* `DStream.reduceByKey()`, but applies it over a sliding window.
* @param reduceFunc associative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
@@ -303,7 +278,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Create a new DStream by reducing over a using incremental computation.
+ * Return a new DStream by reducing over a using incremental computation.
* The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
@@ -328,7 +303,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Create a new DStream by reducing over a using incremental computation.
+ * Return a new DStream by applying incremental `reduceByKey` over a sliding window.
* The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
@@ -342,25 +317,31 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param numPartitions Number of partitions of each RDD in the new DStream.
+ * @param numPartitions number of partitions of each RDD in the new DStream.
+ * @param filterFunc function to filter expired key-value pairs;
+ * only pairs that satisfy the function are retained
+ * set this to null if you do not want to filter
*/
def reduceByKeyAndWindow(
reduceFunc: Function2[V, V, V],
invReduceFunc: Function2[V, V, V],
windowDuration: Duration,
slideDuration: Duration,
- numPartitions: Int
+ numPartitions: Int,
+ filterFunc: JFunction[(K, V), java.lang.Boolean]
): JavaPairDStream[K, V] = {
dstream.reduceByKeyAndWindow(
reduceFunc,
invReduceFunc,
windowDuration,
slideDuration,
- numPartitions)
+ numPartitions,
+ (p: (K, V)) => filterFunc(p).booleanValue()
+ )
}
/**
- * Create a new DStream by reducing over a using incremental computation.
+ * Return a new DStream by applying incremental `reduceByKey` over a sliding window.
* The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
@@ -374,49 +355,26 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param filterFunc function to filter expired key-value pairs;
+ * only pairs that satisfy the function are retained
+ * set this to null if you do not want to filter
*/
def reduceByKeyAndWindow(
reduceFunc: Function2[V, V, V],
invReduceFunc: Function2[V, V, V],
windowDuration: Duration,
slideDuration: Duration,
- partitioner: Partitioner
- ): JavaPairDStream[K, V] = {
+ partitioner: Partitioner,
+ filterFunc: JFunction[(K, V), java.lang.Boolean]
+ ): JavaPairDStream[K, V] = {
dstream.reduceByKeyAndWindow(
reduceFunc,
invReduceFunc,
windowDuration,
slideDuration,
- partitioner)
- }
-
- /**
- * Create a new DStream by counting the number of values for each key over a window.
- * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
- * @param windowDuration width of the window; must be a multiple of this DStream's
- * batching interval
- * @param slideDuration sliding interval of the window (i.e., the interval after which
- * the new DStream will generate RDDs); must be a multiple of this
- * DStream's batching interval
- */
- def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration)
- : JavaPairDStream[K, JLong] = {
- JavaPairDStream.scalaToJavaLong(dstream.countByKeyAndWindow(windowDuration, slideDuration))
- }
-
- /**
- * Create a new DStream by counting the number of values for each key over a window.
- * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
- * @param windowDuration width of the window; must be a multiple of this DStream's
- * batching interval
- * @param slideDuration sliding interval of the window (i.e., the interval after which
- * the new DStream will generate RDDs); must be a multiple of this
- * DStream's batching interval
- * @param numPartitions Number of partitions of each RDD in the new DStream.
- */
- def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int)
- : JavaPairDStream[K, Long] = {
- dstream.countByKeyAndWindow(windowDuration, slideDuration, numPartitions)
+ partitioner,
+ (p: (K, V)) => filterFunc(p).booleanValue()
+ )
}
private def convertUpdateStateFunction[S](in: JFunction2[JList[V], Optional[S], Optional[S]]):
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index 5bbf2b084f..d9a676819a 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -130,7 +130,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
*/
def networkTextStream(hostname: String, port: Int, storageLevel: StorageLevel)
: JavaDStream[String] = {
- ssc.networkTextStream(hostname, port, storageLevel)
+ ssc.socketTextStream(hostname, port, storageLevel)
}
/**
@@ -140,8 +140,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
*/
- def networkTextStream(hostname: String, port: Int): JavaDStream[String] = {
- ssc.networkTextStream(hostname, port)
+ def socketTextStream(hostname: String, port: Int): JavaDStream[String] = {
+ ssc.socketTextStream(hostname, port)
}
/**
@@ -154,7 +154,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param storageLevel Storage level to use for storing the received objects
* @tparam T Type of the objects received (after converting bytes to objects)
*/
- def networkStream[T](
+ def socketStream[T](
hostname: String,
port: Int,
converter: JFunction[InputStream, java.lang.Iterable[T]],
@@ -163,7 +163,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
def fn = (x: InputStream) => converter.apply(x).toIterator
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
- ssc.networkStream(hostname, port, fn, storageLevel)
+ ssc.socketStream(hostname, port, fn, storageLevel)
}
/**
@@ -314,12 +314,11 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/**
* Sets the context to periodically checkpoint the DStream operations for master
- * fault-tolerance. By default, the graph will be checkpointed every batch interval.
+ * fault-tolerance. The graph will be checkpointed every batch interval.
* @param directory HDFS-compatible directory where the checkpoint data will be reliably stored
- * @param interval checkpoint interval
*/
- def checkpoint(directory: String, interval: Duration = null) {
- ssc.checkpoint(directory, interval)
+ def checkpoint(directory: String) {
+ ssc.checkpoint(directory)
}
/**
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
index c6ffb252ce..41b9bd9461 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
@@ -21,19 +21,21 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
+ // Latest file mod time seen till any point of time
private val lastModTimeFiles = new HashSet[String]()
private var lastModTime = 0L
@transient private var path_ : Path = null
@transient private var fs_ : FileSystem = null
- @transient private var files = new HashMap[Time, Array[String]]
+ @transient private[streaming] var files = new HashMap[Time, Array[String]]
override def start() {
if (newFilesOnly) {
- lastModTime = System.currentTimeMillis()
+ lastModTime = graph.zeroTime.milliseconds
} else {
lastModTime = 0
}
+ logDebug("LastModTime initialized to " + lastModTime + ", new files only = " + newFilesOnly)
}
override def stop() { }
@@ -43,38 +45,50 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
* a union RDD out of them. Note that this maintains the list of files that were processed
* in the latest modification time in the previous call to this method. This is because the
* modification time returned by the FileStatus API seems to return times only at the
- * granularity of seconds. Hence, new files may have the same modification time as the
- * latest modification time in the previous call to this method and the list of files
- * maintained is used to filter the one that have been processed.
+ * granularity of seconds. And new files may have the same modification time as the
+ * latest modification time in the previous call to this method yet was not reported in
+ * the previous call.
*/
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
+ assert(validTime.milliseconds >= lastModTime, "Trying to get new files for really old time [" + validTime + " < " + lastModTime)
+
// Create the filter for selecting new files
val newFilter = new PathFilter() {
+ // Latest file mod time seen in this round of fetching files and its corresponding files
var latestModTime = 0L
val latestModTimeFiles = new HashSet[String]()
def accept(path: Path): Boolean = {
- if (!filter(path)) {
+ if (!filter(path)) { // Reject file if it does not satisfy filter
+ logDebug("Rejected by filter " + path)
return false
- } else {
+ } else { // Accept file only if
val modTime = fs.getFileStatus(path).getModificationTime()
- if (modTime < lastModTime){
- return false
+ logDebug("Mod time for " + path + " is " + modTime)
+ if (modTime < lastModTime) {
+ logDebug("Mod time less than last mod time")
+ return false // If the file was created before the last time it was called
} else if (modTime == lastModTime && lastModTimeFiles.contains(path.toString)) {
- return false
+ logDebug("Mod time equal to last mod time, but file considered already")
+ return false // If the file was created exactly as lastModTime but not reported yet
+ } else if (modTime > validTime.milliseconds) {
+ logDebug("Mod time more than valid time")
+ return false // If the file was created after the time this function call requires
}
if (modTime > latestModTime) {
latestModTime = modTime
latestModTimeFiles.clear()
+ logDebug("Latest mod time updated to " + latestModTime)
}
latestModTimeFiles += path.toString
+ logDebug("Accepted " + path)
return true
}
}
}
-
+ logDebug("Finding new files at time " + validTime + " for last mod time = " + lastModTime)
val newFiles = fs.listStatus(path, newFilter).map(_.getPath.toString)
- logInfo("New files: " + newFiles.mkString(", "))
+ logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
if (newFiles.length > 0) {
// Update the modification time and the files processed for that modification time
if (lastModTime != newFilter.latestModTime) {
@@ -82,17 +96,21 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
lastModTimeFiles.clear()
}
lastModTimeFiles ++= newFilter.latestModTimeFiles
+ logDebug("Last mod time updated to " + lastModTime)
}
files += ((validTime, newFiles))
Some(filesToRDD(newFiles))
}
- /** Forget the old time-to-files mappings along with old RDDs */
- protected[streaming] override def forgetOldMetadata(time: Time) {
- super.forgetOldMetadata(time)
- val filesToBeRemoved = files.filter(_._1 <= (time - rememberDuration))
- files --= filesToBeRemoved.keys
- logInfo("Forgot " + filesToBeRemoved.size + " files from " + this)
+ /** Clear the old time-to-files mappings along with old RDDs */
+ protected[streaming] override def clearOldMetadata(time: Time) {
+ super.clearOldMetadata(time)
+ val oldFiles = files.filter(_._1 <= (time - rememberDuration))
+ files --= oldFiles.keys
+ logInfo("Cleared " + oldFiles.size + " old files that were older than " +
+ (time - rememberDuration) + ": " + oldFiles.keys.mkString(", "))
+ logDebug("Cleared files are:\n" +
+ oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
}
/** Generate one RDD from an array of files */
@@ -128,7 +146,7 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
private[streaming]
class FileInputDStreamCheckpointData extends DStreamCheckpointData(this) {
- def hadoopFiles = data.asInstanceOf[HashMap[Time, Array[String]]]
+ def hadoopFiles = data.asInstanceOf[HashMap[Time, Array[String]]]
override def update() {
hadoopFiles.clear()
@@ -139,14 +157,20 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
override def restore() {
hadoopFiles.foreach {
- case (time, files) => {
- logInfo("Restoring Hadoop RDD for time " + time + " from files " +
- files.mkString("[", ",", "]") )
- files
- generatedRDDs += ((time, filesToRDD(files)))
+ case (t, f) => {
+ // Restore the metadata in both files and generatedRDDs
+ logInfo("Restoring files for time " + t + " - " +
+ f.mkString("[", ", ", "]") )
+ files += ((t, f))
+ generatedRDDs += ((t, filesToRDD(f)))
}
}
}
+
+ override def toString() = {
+ "[\n" + hadoopFiles.size + " file sets\n" +
+ hadoopFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n") + "\n]"
+ }
}
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala
index efc7058480..c9644b3a83 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala
@@ -25,7 +25,7 @@ class FlumeInputDStream[T: ClassManifest](
storageLevel: StorageLevel
) extends NetworkInputDStream[SparkFlumeEvent](ssc_) {
- override def createReceiver(): NetworkReceiver[SparkFlumeEvent] = {
+ override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = {
new FlumeReceiver(host, port, storageLevel)
}
}
@@ -134,4 +134,4 @@ class FlumeReceiver(
}
override def getLocationPreference = Some(host)
-} \ No newline at end of file
+}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala
index 980ca5177e..a4db44a608 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala
@@ -1,10 +1,42 @@
package spark.streaming.dstream
-import spark.streaming.{Duration, StreamingContext, DStream}
+import spark.streaming.{Time, Duration, StreamingContext, DStream}
+/**
+ * This is the abstract base class for all input streams. This class provides to methods
+ * start() and stop() which called by the scheduler to start and stop receiving data/
+ * Input streams that can generated RDDs from new data just by running a service on
+ * the driver node (that is, without running a receiver onworker nodes) can be
+ * implemented by directly subclassing this InputDStream. For example,
+ * FileInputDStream, a subclass of InputDStream, monitors a HDFS directory for
+ * new files and generates RDDs on the new files. For implementing input streams
+ * that requires running a receiver on the worker nodes, use NetworkInputDStream
+ * as the parent class.
+ */
abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext)
extends DStream[T](ssc_) {
+ var lastValidTime: Time = null
+
+ /**
+ * Checks whether the 'time' is valid wrt slideDuration for generating RDD.
+ * Additionally it also ensures valid times are in strictly increasing order.
+ * This ensures that InputDStream.compute() is called strictly on increasing
+ * times.
+ */
+ override protected def isTimeValid(time: Time): Boolean = {
+ if (!super.isTimeValid(time)) {
+ false // Time not valid
+ } else {
+ // Time is valid, but check it it is more than lastValidTime
+ if (lastValidTime == null || lastValidTime <= time) {
+ logWarning("isTimeValid called with " + time + " where as last valid time is " + lastValidTime)
+ }
+ lastValidTime = time
+ true
+ }
+ }
+
override def dependencies = List()
override def slideDuration: Duration = {
@@ -13,7 +45,9 @@ abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContex
ssc.graph.batchDuration
}
+ /** Method called to start receiving data. Subclasses must implement this method. */
def start()
+ /** Method called to stop receiving data. Subclasses must implement this method. */
def stop()
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
index 4f8c8b9d10..dc7139cc27 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
@@ -41,7 +41,8 @@ class KafkaInputDStream[T: ClassManifest](
storageLevel: StorageLevel
) extends NetworkInputDStream[T](ssc_ ) with Logging {
- def createReceiver(): NetworkReceiver[T] = {
+
+ def getReceiver(): NetworkReceiver[T] = {
new KafkaReceiver(zkQuorum, groupId, topics, initialOffsets, storageLevel)
.asInstanceOf[NetworkReceiver[T]]
}
@@ -73,7 +74,7 @@ class KafkaReceiver(zkQuorum: String, groupId: String,
logInfo("Starting Kafka Consumer Stream with group: " + groupId)
logInfo("Initial offsets: " + initialOffsets.toString)
-
+
// Zookeper connection properties
val props = new Properties()
props.put("zk.connect", zkQuorum)
@@ -104,7 +105,7 @@ class KafkaReceiver(zkQuorum: String, groupId: String,
offsets.foreach { case(key, offset) =>
val topicDirs = new ZKGroupTopicDirs(key.groupId, key.topic)
val partitionName = key.brokerId + "-" + key.partId
- updatePersistentPath(consumerConnector.zkClient,
+ updatePersistentPath(consumerConnector.zkClient,
topicDirs.consumerOffsetDir + "/" + partitionName, offset.toString)
}
}
@@ -115,10 +116,10 @@ class KafkaReceiver(zkQuorum: String, groupId: String,
logInfo("Starting MessageHandler.")
stream.takeWhile { msgAndMetadata =>
blockGenerator += msgAndMetadata.message
-
// Keep on handling messages
+
true
- }
+ }
}
}
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
index 8c322dd698..7385474963 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
@@ -20,7 +20,7 @@ import java.util.concurrent.ArrayBlockingQueue
/**
* Abstract class for defining any InputDStream that has to start a receiver on worker
* nodes to receive external data. Specific implementations of NetworkInputDStream must
- * define the createReceiver() function that creates the receiver object of type
+ * define the getReceiver() function that gets the receiver object of type
* [[spark.streaming.dstream.NetworkReceiver]] that will be sent to the workers to receive
* data.
* @param ssc_ Streaming context that will execute this input stream
@@ -34,11 +34,11 @@ abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : Streaming
val id = ssc.getNewNetworkStreamId()
/**
- * Creates the receiver object that will be sent to the worker nodes
+ * Gets the receiver object that will be sent to the worker nodes
* to receive data. This method needs to defined by any specific implementation
* of a NetworkInputDStream.
*/
- def createReceiver(): NetworkReceiver[T]
+ def getReceiver(): NetworkReceiver[T]
// Nothing to start or stop as both taken care of by the NetworkInputTracker.
def start() {}
@@ -46,8 +46,15 @@ abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : Streaming
def stop() {}
override def compute(validTime: Time): Option[RDD[T]] = {
- val blockIds = ssc.networkInputTracker.getBlockIds(id, validTime)
- Some(new BlockRDD[T](ssc.sc, blockIds))
+ // If this is called for any time before the start time of the context,
+ // then this returns an empty RDD. This may happen when recovering from a
+ // master failure
+ if (validTime >= graph.startTime) {
+ val blockIds = ssc.networkInputTracker.getBlockIds(id, validTime)
+ Some(new BlockRDD[T](ssc.sc, blockIds))
+ } else {
+ Some(new BlockRDD[T](ssc.sc, Array[String]()))
+ }
}
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala
new file mode 100644
index 0000000000..3c2a81947b
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala
@@ -0,0 +1,13 @@
+package spark.streaming.dstream
+
+import spark.streaming.StreamingContext
+
+private[streaming]
+class PluggableInputDStream[T: ClassManifest](
+ @transient ssc_ : StreamingContext,
+ receiver: NetworkReceiver[T]) extends NetworkInputDStream[T](ssc_) {
+
+ def getReceiver(): NetworkReceiver[T] = {
+ receiver
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
index 04e6b69b7b..1b2fa56779 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
@@ -25,7 +25,7 @@ class RawInputDStream[T: ClassManifest](
storageLevel: StorageLevel
) extends NetworkInputDStream[T](ssc_ ) with Logging {
- def createReceiver(): NetworkReceiver[T] = {
+ def getReceiver(): NetworkReceiver[T] = {
new RawNetworkReceiver(host, port, storageLevel).asInstanceOf[NetworkReceiver[T]]
}
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
index 733d5c4a25..aa5a71e1ed 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -3,7 +3,7 @@ package spark.streaming.dstream
import spark.streaming.StreamingContext._
import spark.RDD
-import spark.rdd.CoGroupedRDD
+import spark.rdd.{CoGroupedRDD, MapPartitionsRDD}
import spark.Partitioner
import spark.SparkContext._
import spark.storage.StorageLevel
@@ -15,7 +15,8 @@ private[streaming]
class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
parent: DStream[(K, V)],
reduceFunc: (V, V) => V,
- invReduceFunc: (V, V) => V,
+ invReduceFunc: (V, V) => V,
+ filterFunc: Option[((K, V)) => Boolean],
_windowDuration: Duration,
_slideDuration: Duration,
partitioner: Partitioner
@@ -87,22 +88,25 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
//
// Get the RDDs of the reduced values in "old time steps"
- val oldRDDs = reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration)
+ val oldRDDs =
+ reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration)
logDebug("# old RDDs = " + oldRDDs.size)
// Get the RDDs of the reduced values in "new time steps"
- val newRDDs = reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime)
+ val newRDDs =
+ reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime)
logDebug("# new RDDs = " + newRDDs.size)
// Get the RDD of the reduced value of the previous window
- val previousWindowRDD = getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K,V)]()))
+ val previousWindowRDD =
+ getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K,V)]()))
// Make the list of RDDs that needs to cogrouped together for reducing their reduced values
val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs
// Cogroup the reduced RDDs and merge the reduced values
- val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(_, _)]]], partitioner)
- //val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _
+ val cogroupedRDD =
+ new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(_, _)]]], partitioner)
val numOldValues = oldRDDs.size
val numNewValues = newRDDs.size
@@ -114,7 +118,9 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
// Getting reduced values "old time steps" that will be removed from current window
val oldValues = (1 to numOldValues).map(i => seqOfValues(i)).filter(!_.isEmpty).map(_.head)
// Getting reduced values "new time steps"
- val newValues = (1 to numNewValues).map(i => seqOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head)
+ val newValues =
+ (1 to numNewValues).map(i => seqOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head)
+
if (seqOfValues(0).isEmpty) {
// If previous window's reduce value does not exist, then at least new values should exist
if (newValues.isEmpty) {
@@ -140,10 +146,12 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K,Seq[Seq[V]])]].mapValues(mergeValues)
- Some(mergedValuesRDD)
+ if (filterFunc.isDefined) {
+ Some(mergedValuesRDD.filter(filterFunc.get))
+ } else {
+ Some(mergedValuesRDD)
+ }
}
-
-
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
index d42027092b..4af839ad7f 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
@@ -15,7 +15,7 @@ class SocketInputDStream[T: ClassManifest](
storageLevel: StorageLevel
) extends NetworkInputDStream[T](ssc_) {
- def createReceiver(): NetworkReceiver[T] = {
+ def getReceiver(): NetworkReceiver[T] = {
new SocketReceiver(host, port, bytesToObjects, storageLevel)
}
}
diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala
index 99ed4cdc1c..c697498862 100644
--- a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala
@@ -1,12 +1,11 @@
-package spark.streaming.examples.twitter
+package spark.streaming.dstream
import spark._
import spark.streaming._
-import dstream.{NetworkReceiver, NetworkInputDStream}
import storage.StorageLevel
+
import twitter4j._
import twitter4j.auth.BasicAuthorization
-import collection.JavaConversions._
/* A stream of Twitter statuses, potentially filtered by one or more keywords.
*
@@ -14,19 +13,21 @@ import collection.JavaConversions._
* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is
* such that this may return a sampled subset of all tweets during each interval.
*/
+private[streaming]
class TwitterInputDStream(
@transient ssc_ : StreamingContext,
username: String,
password: String,
filters: Seq[String],
storageLevel: StorageLevel
- ) extends NetworkInputDStream[Status](ssc_) {
+ ) extends NetworkInputDStream[Status](ssc_) {
- override def createReceiver(): NetworkReceiver[Status] = {
+ override def getReceiver(): NetworkReceiver[Status] = {
new TwitterReceiver(username, password, filters, storageLevel)
}
}
+private[streaming]
class TwitterReceiver(
username: String,
password: String,
@@ -50,7 +51,7 @@ class TwitterReceiver(
def onTrackLimitationNotice(i: Int) {}
def onScrubGeo(l: Long, l1: Long) {}
def onStallWarning(stallWarning: StallWarning) {}
- def onException(e: Exception) {}
+ def onException(e: Exception) { stopOnError(e) }
})
val query: FilterQuery = new FilterQuery
diff --git a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala
new file mode 100644
index 0000000000..b3201d0b28
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala
@@ -0,0 +1,153 @@
+package spark.streaming.receivers
+
+import akka.actor.{ Actor, PoisonPill, Props, SupervisorStrategy }
+import akka.actor.{ actorRef2Scala, ActorRef }
+import akka.actor.{ PossiblyHarmful, OneForOneStrategy }
+
+import spark.storage.StorageLevel
+import spark.streaming.dstream.NetworkReceiver
+
+import java.util.concurrent.atomic.AtomicInteger
+
+/** A helper with set of defaults for supervisor strategy **/
+object ReceiverSupervisorStrategy {
+
+ import akka.util.duration._
+ import akka.actor.SupervisorStrategy._
+
+ val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
+ 15 millis) {
+ case _: RuntimeException ⇒ Restart
+ case _: Exception ⇒ Escalate
+ }
+}
+
+/**
+ * A receiver trait to be mixed in with your Actor to gain access to
+ * pushBlock API.
+ *
+ * @example {{{
+ * class MyActor extends Actor with Receiver{
+ * def receive {
+ * case anything :String ⇒ pushBlock(anything)
+ * }
+ * }
+ * //Can be plugged in actorStream as follows
+ * ssc.actorStream[String](Props(new MyActor),"MyActorReceiver")
+ *
+ * }}}
+ *
+ * @note An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e parametrized type of push block and InputDStream
+ * should be same.
+ *
+ */
+trait Receiver { self: Actor ⇒
+ def pushBlock[T: ClassManifest](iter: Iterator[T]) {
+ context.parent ! Data(iter)
+ }
+
+ def pushBlock[T: ClassManifest](data: T) {
+ context.parent ! Data(data)
+ }
+
+}
+
+/**
+ * Statistics for querying the supervisor about state of workers
+ */
+case class Statistics(numberOfMsgs: Int,
+ numberOfWorkers: Int,
+ numberOfHiccups: Int,
+ otherInfo: String)
+
+/** Case class to receive data sent by child actors **/
+private[streaming] case class Data[T: ClassManifest](data: T)
+
+/**
+ * Provides Actors as receivers for receiving stream.
+ *
+ * As Actors can also be used to receive data from almost any stream source.
+ * A nice set of abstraction(s) for actors as receivers is already provided for
+ * a few general cases. It is thus exposed as an API where user may come with
+ * his own Actor to run as receiver for Spark Streaming input source.
+ *
+ * This starts a supervisor actor which starts workers and also provides
+ * [http://doc.akka.io/docs/akka/2.0.5/scala/fault-tolerance.html fault-tolerance].
+ *
+ * Here's a way to start more supervisor/workers as its children.
+ *
+ * @example {{{
+ * context.parent ! Props(new Supervisor)
+ * }}} OR {{{
+ * context.parent ! Props(new Worker,"Worker")
+ * }}}
+ *
+ *
+ */
+private[streaming] class ActorReceiver[T: ClassManifest](
+ props: Props,
+ name: String,
+ storageLevel: StorageLevel,
+ receiverSupervisorStrategy: SupervisorStrategy)
+ extends NetworkReceiver[T] {
+
+ protected lazy val blocksGenerator: BlockGenerator =
+ new BlockGenerator(storageLevel)
+
+ protected lazy val supervisor = env.actorSystem.actorOf(Props(new Supervisor),
+ "Supervisor" + streamId)
+
+ private class Supervisor extends Actor {
+
+ override val supervisorStrategy = receiverSupervisorStrategy
+ val worker = context.actorOf(props, name)
+ logInfo("Started receiver worker at:" + worker.path)
+
+ val n: AtomicInteger = new AtomicInteger(0)
+ val hiccups: AtomicInteger = new AtomicInteger(0)
+
+ def receive = {
+
+ case Data(iter: Iterator[_]) ⇒ pushBlock(iter.asInstanceOf[Iterator[T]])
+
+ case Data(msg) ⇒
+ blocksGenerator += msg.asInstanceOf[T]
+ n.incrementAndGet
+
+ case props: Props ⇒
+ val worker = context.actorOf(props)
+ logInfo("Started receiver worker at:" + worker.path)
+ sender ! worker
+
+ case (props: Props, name: String) ⇒
+ val worker = context.actorOf(props, name)
+ logInfo("Started receiver worker at:" + worker.path)
+ sender ! worker
+
+ case _: PossiblyHarmful => hiccups.incrementAndGet()
+
+ case _: Statistics ⇒
+ val workers = context.children
+ sender ! Statistics(n.get, workers.size, hiccups.get, workers.mkString("\n"))
+
+ }
+ }
+
+ protected def pushBlock(iter: Iterator[T]) {
+ pushBlock("block-" + streamId + "-" + System.nanoTime(),
+ iter, null, storageLevel)
+ }
+
+ protected def onStart() = {
+ blocksGenerator.start()
+ supervisor
+ logInfo("Supervision tree for receivers initialized at:" + supervisor.path)
+ }
+
+ protected def onStop() = {
+ supervisor ! PoisonPill
+ }
+
+}
diff --git a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
new file mode 100644
index 0000000000..bdd9f4d753
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
@@ -0,0 +1,392 @@
+package spark.streaming.util
+
+import spark.{Logging, RDD}
+import spark.streaming._
+import spark.streaming.dstream.ForEachDStream
+import StreamingContext._
+
+import scala.util.Random
+import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+
+import java.io.{File, ObjectInputStream, IOException}
+import java.util.UUID
+
+import com.google.common.io.Files
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.fs.{FileUtil, FileSystem, Path}
+import org.apache.hadoop.conf.Configuration
+
+
+private[streaming]
+object MasterFailureTest extends Logging {
+ initLogging()
+
+ @volatile var killed = false
+ @volatile var killCount = 0
+
+ def main(args: Array[String]) {
+ if (args.size < 2) {
+ println(
+ "Usage: MasterFailureTest <local/HDFS directory> <# batches> [<batch size in milliseconds>]")
+ System.exit(1)
+ }
+ val directory = args(0)
+ val numBatches = args(1).toInt
+ val batchDuration = if (args.size > 2) Milliseconds(args(2).toInt) else Seconds(1)
+
+ println("\n\n========================= MAP TEST =========================\n\n")
+ testMap(directory, numBatches, batchDuration)
+
+ println("\n\n================= UPDATE-STATE-BY-KEY TEST =================\n\n")
+ testUpdateStateByKey(directory, numBatches, batchDuration)
+
+ println("\n\nSUCCESS\n\n")
+ }
+
+ def testMap(directory: String, numBatches: Int, batchDuration: Duration) {
+ // Input: time=1 ==> [ 1 ] , time=2 ==> [ 2 ] , time=3 ==> [ 3 ] , ...
+ val input = (1 to numBatches).map(_.toString).toSeq
+ // Expected output: time=1 ==> [ 1 ] , time=2 ==> [ 2 ] , time=3 ==> [ 3 ] , ...
+ val expectedOutput = (1 to numBatches)
+
+ val operation = (st: DStream[String]) => st.map(_.toInt)
+
+ // Run streaming operation with multiple master failures
+ val output = testOperation(directory, batchDuration, input, operation, expectedOutput)
+
+ logInfo("Expected output, size = " + expectedOutput.size)
+ logInfo(expectedOutput.mkString("[", ",", "]"))
+ logInfo("Output, size = " + output.size)
+ logInfo(output.mkString("[", ",", "]"))
+
+ // Verify whether all the values of the expected output is present
+ // in the output
+ assert(output.distinct.toSet == expectedOutput.toSet)
+ }
+
+
+ def testUpdateStateByKey(directory: String, numBatches: Int, batchDuration: Duration) {
+ // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ...
+ val input = (1 to numBatches).map(i => (1 to i).map(_ => "a").mkString(" ")).toSeq
+ // Expected output: time=1 ==> [ (a, 1) ] , time=2 ==> [ (a, 3) ] , time=3 ==> [ (a,6) ] , ...
+ val expectedOutput = (1L to numBatches).map(i => (1L to i).reduce(_ + _)).map(j => ("a", j))
+
+ val operation = (st: DStream[String]) => {
+ val updateFunc = (values: Seq[Long], state: Option[Long]) => {
+ Some(values.foldLeft(0L)(_ + _) + state.getOrElse(0L))
+ }
+ st.flatMap(_.split(" "))
+ .map(x => (x, 1L))
+ .updateStateByKey[Long](updateFunc)
+ .checkpoint(batchDuration * 5)
+ }
+
+ // Run streaming operation with multiple master failures
+ val output = testOperation(directory, batchDuration, input, operation, expectedOutput)
+
+ logInfo("Expected output, size = " + expectedOutput.size + "\n" + expectedOutput)
+ logInfo("Output, size = " + output.size + "\n" + output)
+
+ // Verify whether all the values in the output are among the expected output values
+ output.foreach(o =>
+ assert(expectedOutput.contains(o), "Expected value " + o + " not found")
+ )
+
+ // Verify whether the last expected output value has been generated, there by
+ // confirming that none of the inputs have been missed
+ assert(output.last == expectedOutput.last)
+ }
+
+ /**
+ * Tests stream operation with multiple master failures, and verifies whether the
+ * final set of output values is as expected or not.
+ */
+ def testOperation[T: ClassManifest](
+ directory: String,
+ batchDuration: Duration,
+ input: Seq[String],
+ operation: DStream[String] => DStream[T],
+ expectedOutput: Seq[T]
+ ): Seq[T] = {
+
+ // Just making sure that the expected output does not have duplicates
+ assert(expectedOutput.distinct.toSet == expectedOutput.toSet)
+
+ // Setup the stream computation with the given operation
+ val (ssc, checkpointDir, testDir) = setupStreams(directory, batchDuration, operation)
+
+ // Start generating files in the a different thread
+ val fileGeneratingThread = new FileGeneratingThread(input, testDir, batchDuration.milliseconds)
+ fileGeneratingThread.start()
+
+ // Run the streams and repeatedly kill it until the last expected output
+ // has been generated, or until it has run for twice the expected time
+ val lastExpectedOutput = expectedOutput.last
+ val maxTimeToRun = expectedOutput.size * batchDuration.milliseconds * 2
+ val mergedOutput = runStreams(ssc, lastExpectedOutput, maxTimeToRun)
+
+ // Delete directories
+ fileGeneratingThread.join()
+ val fs = checkpointDir.getFileSystem(new Configuration())
+ fs.delete(checkpointDir, true)
+ fs.delete(testDir, true)
+ logInfo("Finished test after " + killCount + " failures")
+ mergedOutput
+ }
+
+ /**
+ * Sets up the stream computation with the given operation, directory (local or HDFS),
+ * and batch duration. Returns the streaming context and the directory to which
+ * files should be written for testing.
+ */
+ private def setupStreams[T: ClassManifest](
+ directory: String,
+ batchDuration: Duration,
+ operation: DStream[String] => DStream[T]
+ ): (StreamingContext, Path, Path) = {
+ // Reset all state
+ reset()
+
+ // Create the directories for this test
+ val uuid = UUID.randomUUID().toString
+ val rootDir = new Path(directory, uuid)
+ val fs = rootDir.getFileSystem(new Configuration())
+ val checkpointDir = new Path(rootDir, "checkpoint")
+ val testDir = new Path(rootDir, "test")
+ fs.mkdirs(checkpointDir)
+ fs.mkdirs(testDir)
+
+ // Setup the streaming computation with the given operation
+ System.clearProperty("spark.driver.port")
+ var ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration)
+ ssc.checkpoint(checkpointDir.toString)
+ val inputStream = ssc.textFileStream(testDir.toString)
+ val operatedStream = operation(inputStream)
+ val outputStream = new TestOutputStream(operatedStream)
+ ssc.registerOutputStream(outputStream)
+ (ssc, checkpointDir, testDir)
+ }
+
+
+ /**
+ * Repeatedly starts and kills the streaming context until timed out or
+ * the last expected output is generated. Finally, return
+ */
+ private def runStreams[T: ClassManifest](
+ ssc_ : StreamingContext,
+ lastExpectedOutput: T,
+ maxTimeToRun: Long
+ ): Seq[T] = {
+
+ var ssc = ssc_
+ var totalTimeRan = 0L
+ var isLastOutputGenerated = false
+ var isTimedOut = false
+ val mergedOutput = new ArrayBuffer[T]()
+ val checkpointDir = ssc.checkpointDir
+ var batchDuration = ssc.graph.batchDuration
+
+ while(!isLastOutputGenerated && !isTimedOut) {
+ // Get the output buffer
+ val outputBuffer = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[T]].output
+ def output = outputBuffer.flatMap(x => x)
+
+ // Start the thread to kill the streaming after some time
+ killed = false
+ val killingThread = new KillingThread(ssc, batchDuration.milliseconds * 10)
+ killingThread.start()
+
+ var timeRan = 0L
+ try {
+ // Start the streaming computation and let it run while ...
+ // (i) StreamingContext has not been shut down yet
+ // (ii) The last expected output has not been generated yet
+ // (iii) Its not timed out yet
+ System.clearProperty("spark.streaming.clock")
+ System.clearProperty("spark.driver.port")
+ ssc.start()
+ val startTime = System.currentTimeMillis()
+ while (!killed && !isLastOutputGenerated && !isTimedOut) {
+ Thread.sleep(100)
+ timeRan = System.currentTimeMillis() - startTime
+ isLastOutputGenerated = (!output.isEmpty && output.last == lastExpectedOutput)
+ isTimedOut = (timeRan + totalTimeRan > maxTimeToRun)
+ }
+ } catch {
+ case e: Exception => logError("Error running streaming context", e)
+ }
+ if (killingThread.isAlive) killingThread.interrupt()
+ ssc.stop()
+
+ logInfo("Has been killed = " + killed)
+ logInfo("Is last output generated = " + isLastOutputGenerated)
+ logInfo("Is timed out = " + isTimedOut)
+
+ // Verify whether the output of each batch has only one element or no element
+ // and then merge the new output with all the earlier output
+ mergedOutput ++= output
+ totalTimeRan += timeRan
+ logInfo("New output = " + output)
+ logInfo("Merged output = " + mergedOutput)
+ logInfo("Time ran = " + timeRan)
+ logInfo("Total time ran = " + totalTimeRan)
+
+ if (!isLastOutputGenerated && !isTimedOut) {
+ val sleepTime = Random.nextInt(batchDuration.milliseconds.toInt * 10)
+ logInfo(
+ "\n-------------------------------------------\n" +
+ " Restarting stream computation in " + sleepTime + " ms " +
+ "\n-------------------------------------------\n"
+ )
+ Thread.sleep(sleepTime)
+ // Recreate the streaming context from checkpoint
+ ssc = new StreamingContext(checkpointDir)
+ }
+ }
+ mergedOutput
+ }
+
+ /**
+ * Verifies the output value are the same as expected. Since failures can lead to
+ * a batch being processed twice, a batches output may appear more than once
+ * consecutively. To avoid getting confused with those, we eliminate consecutive
+ * duplicate batch outputs of values from the `output`. As a result, the
+ * expected output should not have consecutive batches with the same values as output.
+ */
+ private def verifyOutput[T: ClassManifest](output: Seq[T], expectedOutput: Seq[T]) {
+ // Verify whether expected outputs do not consecutive batches with same output
+ for (i <- 0 until expectedOutput.size - 1) {
+ assert(expectedOutput(i) != expectedOutput(i+1),
+ "Expected output has consecutive duplicate sequence of values")
+ }
+
+ // Log the output
+ println("Expected output, size = " + expectedOutput.size)
+ println(expectedOutput.mkString("[", ",", "]"))
+ println("Output, size = " + output.size)
+ println(output.mkString("[", ",", "]"))
+
+ // Match the output with the expected output
+ output.foreach(o =>
+ assert(expectedOutput.contains(o), "Expected value " + o + " not found")
+ )
+ }
+
+ /** Resets counter to prepare for the test */
+ private def reset() {
+ killed = false
+ killCount = 0
+ }
+}
+
+/**
+ * This is a output stream just for testing. All the output is collected into a
+ * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
+ */
+private[streaming]
+class TestOutputStream[T: ClassManifest](
+ parent: DStream[T],
+ val output: ArrayBuffer[Seq[T]] = new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]]
+ ) extends ForEachDStream[T](
+ parent,
+ (rdd: RDD[T], t: Time) => {
+ val collected = rdd.collect()
+ output += collected
+ }
+ ) {
+
+ // This is to clear the output buffer every it is read from a checkpoint
+ @throws(classOf[IOException])
+ private def readObject(ois: ObjectInputStream) {
+ ois.defaultReadObject()
+ output.clear()
+ }
+}
+
+
+/**
+ * Thread to kill streaming context after a random period of time.
+ */
+private[streaming]
+class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread with Logging {
+ initLogging()
+
+ override def run() {
+ try {
+ // If it is the first killing, then allow the first checkpoint to be created
+ var minKillWaitTime = if (MasterFailureTest.killCount == 0) 5000 else 2000
+ val killWaitTime = minKillWaitTime + math.abs(Random.nextLong % maxKillWaitTime)
+ logInfo("Kill wait time = " + killWaitTime)
+ Thread.sleep(killWaitTime)
+ logInfo(
+ "\n---------------------------------------\n" +
+ "Killing streaming context after " + killWaitTime + " ms" +
+ "\n---------------------------------------\n"
+ )
+ if (ssc != null) {
+ ssc.stop()
+ MasterFailureTest.killed = true
+ MasterFailureTest.killCount += 1
+ }
+ logInfo("Killing thread finished normally")
+ } catch {
+ case ie: InterruptedException => logInfo("Killing thread interrupted")
+ case e: Exception => logWarning("Exception in killing thread", e)
+ }
+
+ }
+}
+
+
+/**
+ * Thread to generate input files periodically with the desired text.
+ */
+private[streaming]
+class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
+ extends Thread with Logging {
+ initLogging()
+
+ override def run() {
+ val localTestDir = Files.createTempDir()
+ var fs = testDir.getFileSystem(new Configuration())
+ val maxTries = 3
+ try {
+ Thread.sleep(5000) // To make sure that all the streaming context has been set up
+ for (i <- 0 until input.size) {
+ // Write the data to a local file and then move it to the target test directory
+ val localFile = new File(localTestDir, (i+1).toString)
+ val hadoopFile = new Path(testDir, (i+1).toString)
+ FileUtils.writeStringToFile(localFile, input(i).toString + "\n")
+ var tries = 0
+ var done = false
+ while (!done && tries < maxTries) {
+ tries += 1
+ try {
+ fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile)
+ done = true
+ } catch {
+ case ioe: IOException => {
+ fs = testDir.getFileSystem(new Configuration())
+ logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.", ioe)
+ }
+ }
+ }
+ if (!done)
+ logError("Could not generate file " + hadoopFile)
+ else
+ logInfo("Generated file " + hadoopFile + " at " + System.currentTimeMillis)
+ Thread.sleep(interval)
+ localFile.delete()
+ }
+ logInfo("File generating thread finished normally")
+ } catch {
+ case ie: InterruptedException => logInfo("File generating thread interrupted")
+ case e: Exception => logWarning("File generating in killing thread", e)
+ } finally {
+ fs.close()
+ }
+ }
+}
+
+
diff --git a/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala
index db715cc295..8e10276deb 100644
--- a/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala
+++ b/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala
@@ -3,9 +3,9 @@ package spark.streaming.util
private[streaming]
class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => Unit) {
- val minPollTime = 25L
+ private val minPollTime = 25L
- val pollTime = {
+ private val pollTime = {
if (period / 10.0 > minPollTime) {
(period / 10.0).toLong
} else {
@@ -13,11 +13,20 @@ class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) =>
}
}
- val thread = new Thread() {
+ private val thread = new Thread() {
override def run() { loop }
}
- var nextTime = 0L
+ private var nextTime = 0L
+
+ def getStartTime(): Long = {
+ (math.floor(clock.currentTime.toDouble / period) + 1).toLong * period
+ }
+
+ def getRestartTime(originalStartTime: Long): Long = {
+ val gap = clock.currentTime - originalStartTime
+ (math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime
+ }
def start(startTime: Long): Long = {
nextTime = startTime
@@ -26,21 +35,14 @@ class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) =>
}
def start(): Long = {
- val startTime = (math.floor(clock.currentTime.toDouble / period) + 1).toLong * period
- start(startTime)
+ start(getStartTime())
}
- def restart(originalStartTime: Long): Long = {
- val gap = clock.currentTime - originalStartTime
- val newStartTime = (math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime
- start(newStartTime)
- }
-
- def stop() {
+ def stop() {
thread.interrupt()
}
- def loop() {
+ private def loop() {
try {
while (true) {
clock.waitTillTime(nextTime)
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index fbe4af4597..5d510fd89f 100644
--- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -33,8 +33,9 @@ public class JavaAPISuite implements Serializable {
@Before
public void setUp() {
- ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
- ssc.checkpoint("checkpoint", new Duration(1000));
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock");
+ ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+ ssc.checkpoint("checkpoint");
}
@After
@@ -45,7 +46,7 @@ public class JavaAPISuite implements Serializable {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port");
}
- /*
+
@Test
public void testCount() {
List<List<Integer>> inputData = Arrays.asList(
@@ -134,29 +135,6 @@ public class JavaAPISuite implements Serializable {
}
@Test
- public void testTumble() {
- List<List<Integer>> inputData = Arrays.asList(
- Arrays.asList(1,2,3),
- Arrays.asList(4,5,6),
- Arrays.asList(7,8,9),
- Arrays.asList(10,11,12),
- Arrays.asList(13,14,15),
- Arrays.asList(16,17,18));
-
- List<List<Integer>> expected = Arrays.asList(
- Arrays.asList(1,2,3,4,5,6),
- Arrays.asList(7,8,9,10,11,12),
- Arrays.asList(13,14,15,16,17,18));
-
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream windowed = stream.tumble(new Duration(2000));
- JavaTestUtils.attachTestOutputStream(windowed);
- List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 6, 3);
-
- assertOrderInvariantEquals(expected, result);
- }
-
- @Test
public void testFilter() {
List<List<String>> inputData = Arrays.asList(
Arrays.asList("giants", "dodgers"),
@@ -434,7 +412,7 @@ public class JavaAPISuite implements Serializable {
assertOrderInvariantEquals(expected, result);
}
- */
+
/*
* Performs an order-invariant comparison of lists representing two RDD streams. This allows
* us to account for ordering variation within individual RDD's which occurs during windowing.
@@ -450,7 +428,7 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(expected, actual);
}
- /*
+
// PairDStream Functions
@Test
public void testPairFilter() {
@@ -583,50 +561,73 @@ public class JavaAPISuite implements Serializable {
}
@Test
- public void testCountByKey() {
- List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+ public void testCountByValue() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("hello", "world"),
+ Arrays.asList("hello", "moon"),
+ Arrays.asList("hello"));
List<List<Tuple2<String, Long>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<String, Long>("california", 2L),
- new Tuple2<String, Long>("new york", 2L)),
- Arrays.asList(
- new Tuple2<String, Long>("california", 2L),
- new Tuple2<String, Long>("new york", 2L)));
-
- JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
- ssc, inputData, 1);
- JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ Arrays.asList(
+ new Tuple2<String, Long>("hello", 1L),
+ new Tuple2<String, Long>("world", 1L)),
+ Arrays.asList(
+ new Tuple2<String, Long>("hello", 1L),
+ new Tuple2<String, Long>("moon", 1L)),
+ Arrays.asList(
+ new Tuple2<String, Long>("hello", 1L)));
- JavaPairDStream<String, Long> counted = pairStream.countByKey();
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Long> counted = stream.countByValue();
JavaTestUtils.attachTestOutputStream(counted);
- List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+ List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
Assert.assertEquals(expected, result);
}
@Test
public void testGroupByKeyAndWindow() {
- List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
- List<List<Tuple2<String, List<String>>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("dodgers", "giants")),
- new Tuple2<String, List<String>>("new york", Arrays.asList("yankees", "mets"))),
- Arrays.asList(new Tuple2<String, List<String>>("california",
- Arrays.asList("sharks", "ducks", "dodgers", "giants")),
- new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders", "yankees", "mets"))),
- Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")),
- new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders"))));
+ List<List<Tuple2<String, List<Integer>>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, List<Integer>>("california", Arrays.asList(1, 3)),
+ new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 4))
+ ),
+ Arrays.asList(
+ new Tuple2<String, List<Integer>>("california", Arrays.asList(1, 3, 5, 5)),
+ new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 1, 3, 4))
+ ),
+ Arrays.asList(
+ new Tuple2<String, List<Integer>>("california", Arrays.asList(5, 5)),
+ new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 3))
+ )
+ );
- JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<String, List<String>> groupWindowed =
+ JavaPairDStream<String, List<Integer>> groupWindowed =
pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(groupWindowed);
- List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+ List<List<Tuple2<String, List<Integer>>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
- Assert.assertEquals(expected, result);
+ assert(result.size() == expected.size());
+ for (int i = 0; i < result.size(); i++) {
+ assert(convert(result.get(i)).equals(convert(expected.get(i))));
+ }
+ }
+
+ private HashSet<Tuple2<String, HashSet<Integer>>> convert(List<Tuple2<String, List<Integer>>> listOfTuples) {
+ List<Tuple2<String, HashSet<Integer>>> newListOfTuples = new ArrayList<Tuple2<String, HashSet<Integer>>>();
+ for (Tuple2<String, List<Integer>> tuple: listOfTuples) {
+ newListOfTuples.add(convert(tuple));
+ }
+ return new HashSet<Tuple2<String, HashSet<Integer>>>(newListOfTuples);
+ }
+
+ private Tuple2<String, HashSet<Integer>> convert(Tuple2<String, List<Integer>> tuple) {
+ return new Tuple2<String, HashSet<Integer>>(tuple._1(), new HashSet<Integer>(tuple._2()));
}
@Test
@@ -711,26 +712,28 @@ public class JavaAPISuite implements Serializable {
}
@Test
- public void testCountByKeyAndWindow() {
- List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+ public void testCountByValueAndWindow() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("hello", "world"),
+ Arrays.asList("hello", "moon"),
+ Arrays.asList("hello"));
List<List<Tuple2<String, Long>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<String, Long>("california", 2L),
- new Tuple2<String, Long>("new york", 2L)),
+ new Tuple2<String, Long>("hello", 1L),
+ new Tuple2<String, Long>("world", 1L)),
Arrays.asList(
- new Tuple2<String, Long>("california", 4L),
- new Tuple2<String, Long>("new york", 4L)),
+ new Tuple2<String, Long>("hello", 2L),
+ new Tuple2<String, Long>("world", 1L),
+ new Tuple2<String, Long>("moon", 1L)),
Arrays.asList(
- new Tuple2<String, Long>("california", 2L),
- new Tuple2<String, Long>("new york", 2L)));
+ new Tuple2<String, Long>("hello", 2L),
+ new Tuple2<String, Long>("moon", 1L)));
- JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(
ssc, inputData, 1);
- JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
JavaPairDStream<String, Long> counted =
- pairStream.countByKeyAndWindow(new Duration(2000), new Duration(1000));
+ stream.countByValueAndWindow(new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(counted);
List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -897,7 +900,7 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(expected, result);
}
- */
+
@Test
public void testCheckpointMasterRecovery() throws InterruptedException {
List<List<String>> inputData = Arrays.asList(
@@ -912,7 +915,7 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(8,7));
File tempDir = Files.createTempDir();
- ssc.checkpoint(tempDir.getAbsolutePath(), new Duration(1000));
+ ssc.checkpoint(tempDir.getAbsolutePath());
JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream letterCount = stream.map(new Function<String, Integer>() {
@@ -964,7 +967,7 @@ public class JavaAPISuite implements Serializable {
assertOrderInvariantEquals(expected, result1);
}
*/
- /*
+
// Input stream tests. These mostly just test that we can instantiate a given InputStream with
// Java arguments and assign it to a JavaDStream without producing type errors. Testing of the
// InputStream functionality is deferred to the existing Scala tests.
@@ -972,15 +975,15 @@ public class JavaAPISuite implements Serializable {
public void testKafkaStream() {
HashMap<String, Integer> topics = Maps.newHashMap();
HashMap<KafkaPartitionKey, Long> offsets = Maps.newHashMap();
- JavaDStream test1 = ssc.kafkaStream("localhost", 12345, "group", topics);
- JavaDStream test2 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets);
- JavaDStream test3 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets,
+ JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics);
+ JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics, offsets);
+ JavaDStream test3 = ssc.kafkaStream("localhost:12345", "group", topics, offsets,
StorageLevel.MEMORY_AND_DISK());
}
@Test
public void testNetworkTextStream() {
- JavaDStream test = ssc.networkTextStream("localhost", 12345);
+ JavaDStream test = ssc.socketTextStream("localhost", 12345);
}
@Test
@@ -1000,7 +1003,7 @@ public class JavaAPISuite implements Serializable {
}
}
- JavaDStream test = ssc.networkStream(
+ JavaDStream test = ssc.socketStream(
"localhost",
12345,
new Converter(),
@@ -1026,5 +1029,5 @@ public class JavaAPISuite implements Serializable {
public void testFileStream() {
JavaPairDStream<String, String> foo =
ssc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo");
- }*/
+ }
}
diff --git a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
index 56349837e5..52ea28732a 100644
--- a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
+++ b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
@@ -57,6 +57,7 @@ trait JavaTestBase extends TestSuiteBase {
}
object JavaTestUtils extends JavaTestBase {
+ override def maxWaitTimeMillis = 20000
}
diff --git a/streaming/src/test/resources/log4j.properties b/streaming/src/test/resources/log4j.properties
index edfa1243fa..59c445e63f 100644
--- a/streaming/src/test/resources/log4j.properties
+++ b/streaming/src/test/resources/log4j.properties
@@ -1,5 +1,6 @@
# Set everything to be logged to the file streaming/target/unit-tests.log
log4j.rootCategory=INFO, file
+# log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
log4j.appender.file.file=streaming/target/unit-tests.log
diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
index c031949dd1..8fce91853c 100644
--- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
@@ -6,6 +6,8 @@ import util.ManualClock
class BasicOperationsSuite extends TestSuiteBase {
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+
override def framework() = "BasicOperationsSuite"
after {
@@ -22,7 +24,7 @@ class BasicOperationsSuite extends TestSuiteBase {
)
}
- test("flatmap") {
+ test("flatMap") {
val input = Seq(1 to 4, 5 to 8, 9 to 12)
testOperation(
input,
@@ -86,6 +88,23 @@ class BasicOperationsSuite extends TestSuiteBase {
)
}
+ test("count") {
+ testOperation(
+ Seq(1 to 1, 1 to 2, 1 to 3, 1 to 4),
+ (s: DStream[Int]) => s.count(),
+ Seq(Seq(1L), Seq(2L), Seq(3L), Seq(4L))
+ )
+ }
+
+ test("countByValue") {
+ testOperation(
+ Seq(1 to 1, Seq(1, 1, 1), 1 to 2, Seq(1, 1, 2, 2)),
+ (s: DStream[Int]) => s.countByValue(),
+ Seq(Seq((1, 1L)), Seq((1, 3L)), Seq((1, 1L), (2, 1L)), Seq((2, 2L), (1, 2L))),
+ true
+ )
+ }
+
test("mapValues") {
testOperation(
Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ),
@@ -204,12 +223,32 @@ class BasicOperationsSuite extends TestSuiteBase {
case _ => Option(stateObj)
}
}
- s.map(_ -> 1).updateStateByKey[StateObject](updateFunc).mapValues(_.counter)
+ s.map(x => (x, 1)).updateStateByKey[StateObject](updateFunc).mapValues(_.counter)
}
testOperation(inputData, updateStateOperation, outputData, true)
}
+ test("slice") {
+ val ssc = new StreamingContext("local[2]", "BasicOperationSuite", Seconds(1))
+ val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4))
+ val stream = new TestInputStream[Int](ssc, input, 2)
+ ssc.registerInputStream(stream)
+ stream.foreach(_ => {}) // Dummy output stream
+ ssc.start()
+ Thread.sleep(2000)
+ def getInputFromSlice(fromMillis: Long, toMillis: Long) = {
+ stream.slice(new Time(fromMillis), new Time(toMillis)).flatMap(_.collect()).toSet
+ }
+
+ assert(getInputFromSlice(0, 1000) == Set(1))
+ assert(getInputFromSlice(0, 2000) == Set(1, 2))
+ assert(getInputFromSlice(1000, 2000) == Set(1, 2))
+ assert(getInputFromSlice(2000, 4000) == Set(2, 3, 4))
+ ssc.stop()
+ Thread.sleep(1000)
+ }
+
test("forgetting of RDDs - map and window operations") {
assert(batchDuration === Seconds(1), "Batch duration has changed from 1 second")
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
index 7126af62d9..cac86deeaf 100644
--- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
@@ -1,5 +1,6 @@
package spark.streaming
+import dstream.FileInputDStream
import spark.streaming.StreamingContext._
import java.io.File
import runtime.RichInt
@@ -10,8 +11,16 @@ import util.{Clock, ManualClock}
import scala.util.Random
import com.google.common.io.Files
+
+/**
+ * This test suites tests the checkpointing functionality of DStreams -
+ * the checkpointing of a DStream's RDDs as well as the checkpointing of
+ * the whole DStream graph.
+ */
class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+
before {
FileUtils.deleteDirectory(new File(checkpointDir))
}
@@ -30,21 +39,18 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
override def batchDuration = Milliseconds(500)
- override def checkpointInterval = batchDuration
-
override def actuallyWait = true
test("basic rdd checkpoints + dstream graph checkpoint recovery") {
assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second")
- assert(checkpointInterval === batchDuration, "checkpointInterval for this test much be same as batchDuration")
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
val stateStreamCheckpointInterval = Seconds(1)
// this ensure checkpointing occurs at least once
- val firstNumBatches = (stateStreamCheckpointInterval / batchDuration) * 2
+ val firstNumBatches = (stateStreamCheckpointInterval / batchDuration).toLong * 2
val secondNumBatches = firstNumBatches
// Setup the streams
@@ -64,7 +70,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Run till a time such that at least one RDD in the stream should have been checkpointed,
// then check whether some RDD has been checkpointed or not
ssc.start()
- runStreamsWithRealDelay(ssc, firstNumBatches)
+ advanceTimeWithRealDelay(ssc, firstNumBatches)
logInfo("Checkpoint data of state stream = \n" + stateStream.checkpointData)
assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before first failure")
stateStream.checkpointData.checkpointFiles.foreach {
@@ -77,7 +83,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Run till a further time such that previous checkpoint files in the stream would be deleted
// and check whether the earlier checkpoint files are deleted
val checkpointFiles = stateStream.checkpointData.checkpointFiles.map(x => new File(x._2))
- runStreamsWithRealDelay(ssc, secondNumBatches)
+ advanceTimeWithRealDelay(ssc, secondNumBatches)
checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file + "' was not deleted"))
ssc.stop()
@@ -92,7 +98,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Run one batch to generate a new checkpoint file and check whether some RDD
// is present in the checkpoint data or not
ssc.start()
- runStreamsWithRealDelay(ssc, 1)
+ advanceTimeWithRealDelay(ssc, 1)
assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before second failure")
stateStream.checkpointData.checkpointFiles.foreach {
case (time, data) => {
@@ -113,7 +119,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Adjust manual clock time as if it is being restarted after a delay
System.setProperty("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString)
ssc.start()
- runStreamsWithRealDelay(ssc, 4)
+ advanceTimeWithRealDelay(ssc, 4)
ssc.stop()
System.clearProperty("spark.streaming.manualClock.jump")
ssc = null
@@ -168,74 +174,95 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
}
// This tests whether file input stream remembers what files were seen before
- // the master failure and uses them again to process a large window operatoin.
+ // the master failure and uses them again to process a large window operation.
// It also tests whether batches, whose processing was incomplete due to the
// failure, are re-processed or not.
test("recovery with file input stream") {
+ // Disable manual clock as FileInputDStream does not work with manual clock
+ val clockProperty = System.getProperty("spark.streaming.clock")
+ System.clearProperty("spark.streaming.clock")
+
// Set up the streaming context and input streams
val testDir = Files.createTempDir()
- var ssc = new StreamingContext(master, framework, batchDuration)
- ssc.checkpoint(checkpointDir, checkpointInterval)
+ var ssc = new StreamingContext(master, framework, Seconds(1))
+ ssc.checkpoint(checkpointDir)
val fileStream = ssc.textFileStream(testDir.toString)
// Making value 3 take large time to process, to ensure that the master
// shuts down in the middle of processing the 3rd batch
val mappedStream = fileStream.map(s => {
val i = s.toInt
- if (i == 3) Thread.sleep(1000)
+ if (i == 3) Thread.sleep(2000)
i
})
+
// Reducing over a large window to ensure that recovery from master failure
// requires reprocessing of all the files seen before the failure
- val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration)
+ val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
val outputBuffer = new ArrayBuffer[Seq[Int]]
var outputStream = new TestOutputStream(reducedStream, outputBuffer)
ssc.registerOutputStream(outputStream)
ssc.start()
// Create files and advance manual clock to process them
- var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ //var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
Thread.sleep(1000)
for (i <- Seq(1, 2, 3)) {
FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
// wait to make sure that the file is written such that it gets shown in the file listings
- Thread.sleep(500)
- clock.addToTime(batchDuration.milliseconds)
- // wait to make sure that FileInputDStream picks up this file only and not any other file
- Thread.sleep(500)
+ Thread.sleep(1000)
}
logInfo("Output = " + outputStream.output.mkString(","))
assert(outputStream.output.size > 0, "No files processed before restart")
ssc.stop()
+ // Verify whether files created have been recorded correctly or not
+ var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
+ def recordedFiles = fileInputDStream.files.values.flatMap(x => x)
+ assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
+
// Create files while the master is down
for (i <- Seq(4, 5, 6)) {
FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
Thread.sleep(1000)
}
- // Restart stream computation from checkpoint and create more files to see whether
- // they are being processed
+ // Recover context from checkpoint file and verify whether the files that were
+ // recorded before failure were saved and successfully recovered
logInfo("*********** RESTARTING ************")
ssc = new StreamingContext(checkpointDir)
+ fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
+ assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
+
+ // Restart stream computation
ssc.start()
- clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
for (i <- Seq(7, 8, 9)) {
FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
- Thread.sleep(500)
- clock.addToTime(batchDuration.milliseconds)
- Thread.sleep(500)
+ Thread.sleep(1000)
}
Thread.sleep(1000)
- logInfo("Output = " + outputStream.output.mkString(","))
+ logInfo("Output = " + outputStream.output.mkString("[", ", ", "]"))
assert(outputStream.output.size > 0, "No files processed after restart")
ssc.stop()
+ // Verify whether files created while the driver was down have been recorded or not
+ assert(!recordedFiles.filter(_.endsWith("4")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("5")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("6")).isEmpty)
+
+ // Verify whether new files created after recover have been recorded or not
+ assert(!recordedFiles.filter(_.endsWith("7")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("8")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("9")).isEmpty)
+
// Append the new output to the old buffer
outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
outputBuffer ++= outputStream.output
- // Verify whether data received by Spark Streaming was as expected
- val expectedOutput = Seq(1, 3, 6, 28, 36, 45)
+ val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45)
logInfo("--------------------------------")
logInfo("output, size = " + outputBuffer.size)
outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
@@ -244,11 +271,17 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
logInfo("--------------------------------")
// Verify whether all the elements received are as expected
- assert(outputBuffer.size === expectedOutput.size)
- for (i <- 0 until outputBuffer.size) {
- assert(outputBuffer(i).size === 1)
- assert(outputBuffer(i).head === expectedOutput(i))
- }
+ val output = outputBuffer.flatMap(x => x)
+ assert(output.contains(6)) // To ensure that the 3rd input (i.e., 3) was processed
+ output.foreach(o => // To ensure all the inputs are correctly added cumulatively
+ assert(expectedOutput.contains(o), "Expected value " + o + " not found")
+ )
+ // To ensure that all the inputs were received correctly
+ assert(expectedOutput.last === output.last)
+
+ // Enable manual clock back again for other tests
+ if (clockProperty != null)
+ System.setProperty("spark.streaming.clock", clockProperty)
}
@@ -278,7 +311,9 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Do the computation for initial number of batches, create checkpoint file and quit
ssc = setupStreams[U, V](input, operation)
- val output = runStreams[V](ssc, initialNumBatches, initialNumExpectedOutputs)
+ ssc.start()
+ val output = advanceTimeWithRealDelay[V](ssc, initialNumBatches)
+ ssc.stop()
verifyOutput[V](output, expectedOutput.take(initialNumBatches), true)
Thread.sleep(1000)
@@ -289,17 +324,20 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
"\n-------------------------------------------\n"
)
ssc = new StreamingContext(checkpointDir)
- val outputNew = runStreams[V](ssc, nextNumBatches, nextNumExpectedOutputs)
+ System.clearProperty("spark.driver.port")
+ ssc.start()
+ val outputNew = advanceTimeWithRealDelay[V](ssc, nextNumBatches)
// the first element will be re-processed data of the last batch before restart
verifyOutput[V](outputNew, expectedOutput.takeRight(nextNumExpectedOutputs), true)
+ ssc.stop()
ssc = null
}
/**
* Advances the manual clock on the streaming scheduler by given number of batches.
- * It also wait for the expected amount of time for each batch.
+ * It also waits for the expected amount of time for each batch.
*/
- def runStreamsWithRealDelay(ssc: StreamingContext, numBatches: Long) {
+ def advanceTimeWithRealDelay[V: ClassManifest](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
logInfo("Manual clock before advancing = " + clock.time)
for (i <- 1 to numBatches.toInt) {
@@ -308,6 +346,8 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
}
logInfo("Manual clock after advancing = " + clock.time)
Thread.sleep(batchDuration.milliseconds)
- }
+ val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]]
+ outputStream.output
+ }
} \ No newline at end of file
diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
index c4cfffbfc1..a5fa7ab92d 100644
--- a/streaming/src/test/scala/spark/streaming/FailureSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
@@ -1,191 +1,40 @@
package spark.streaming
-import org.scalatest.BeforeAndAfter
-import org.apache.commons.io.FileUtils
+import spark.Logging
+import spark.streaming.util.MasterFailureTest
+import StreamingContext._
+
+import org.scalatest.{FunSuite, BeforeAndAfter}
+import com.google.common.io.Files
import java.io.File
-import scala.runtime.RichInt
-import scala.util.Random
-import spark.streaming.StreamingContext._
+import org.apache.commons.io.FileUtils
import collection.mutable.ArrayBuffer
-import spark.Logging
+
/**
* This testsuite tests master failures at random times while the stream is running using
* the real clock.
*/
-class FailureSuite extends TestSuiteBase with BeforeAndAfter {
+class FailureSuite extends FunSuite with BeforeAndAfter with Logging {
+
+ var directory = "FailureSuite"
+ val numBatches = 30
+ val batchDuration = Milliseconds(1000)
before {
- FileUtils.deleteDirectory(new File(checkpointDir))
+ FileUtils.deleteDirectory(new File(directory))
}
after {
- FailureSuite.reset()
- FileUtils.deleteDirectory(new File(checkpointDir))
-
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.driver.port")
- }
-
- override def framework = "CheckpointSuite"
-
- override def batchDuration = Milliseconds(500)
-
- override def checkpointDir = "checkpoint"
-
- override def checkpointInterval = batchDuration
-
- test("multiple failures with updateStateByKey") {
- val n = 30
- // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ...
- val input = (1 to n).map(i => (1 to i).map(_ =>"a").toSeq).toSeq
- // Last output: [ (a, 465) ] for n=30
- val lastOutput = Seq( ("a", (1 to n).reduce(_ + _)) )
-
- val operation = (st: DStream[String]) => {
- val updateFunc = (values: Seq[Int], state: Option[RichInt]) => {
- Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0)))
- }
- st.map(x => (x, 1))
- .updateStateByKey[RichInt](updateFunc)
- .checkpoint(Seconds(2))
- .map(t => (t._1, t._2.self))
- }
-
- testOperationWithMultipleFailures(input, operation, lastOutput, n, n)
- }
-
- test("multiple failures with reduceByKeyAndWindow") {
- val n = 30
- val w = 100
- assert(w > n, "Window should be much larger than the number of input sets in this test")
- // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ...
- val input = (1 to n).map(i => (1 to i).map(_ =>"a").toSeq).toSeq
- // Last output: [ (a, 465) ]
- val lastOutput = Seq( ("a", (1 to n).reduce(_ + _)) )
-
- val operation = (st: DStream[String]) => {
- st.map(x => (x, 1))
- .reduceByKeyAndWindow(_ + _, _ - _, batchDuration * w, batchDuration)
- .checkpoint(Seconds(2))
- }
-
- testOperationWithMultipleFailures(input, operation, lastOutput, n, n)
- }
-
-
- /**
- * Tests stream operation with multiple master failures, and verifies whether the
- * final set of output values is as expected or not. Checking the final value is
- * proof that no intermediate data was lost due to master failures.
- */
- def testOperationWithMultipleFailures[U: ClassManifest, V: ClassManifest](
- input: Seq[Seq[U]],
- operation: DStream[U] => DStream[V],
- lastExpectedOutput: Seq[V],
- numBatches: Int,
- numExpectedOutput: Int
- ) {
- var ssc = setupStreams[U, V](input, operation)
- val mergedOutput = new ArrayBuffer[Seq[V]]()
-
- var totalTimeRan = 0L
- while(totalTimeRan <= numBatches * batchDuration.milliseconds * 2) {
- new KillingThread(ssc, numBatches * batchDuration.milliseconds.toInt / 4).start()
- val (output, timeRan) = runStreamsWithRealClock[V](ssc, numBatches, numExpectedOutput)
-
- mergedOutput ++= output
- totalTimeRan += timeRan
- logInfo("New output = " + output)
- logInfo("Merged output = " + mergedOutput)
- logInfo("Total time spent = " + totalTimeRan)
- val sleepTime = Random.nextInt(numBatches * batchDuration.milliseconds.toInt / 8)
- logInfo(
- "\n-------------------------------------------\n" +
- " Restarting stream computation in " + sleepTime + " ms " +
- "\n-------------------------------------------\n"
- )
- Thread.sleep(sleepTime)
- FailureSuite.failed = false
- ssc = new StreamingContext(checkpointDir)
- }
- ssc.stop()
- ssc = null
-
- // Verify whether the last output is the expected one
- val lastOutput = mergedOutput(mergedOutput.lastIndexWhere(!_.isEmpty))
- assert(lastOutput.toSet === lastExpectedOutput.toSet)
- logInfo("Finished computation after " + FailureSuite.failureCount + " failures")
+ FileUtils.deleteDirectory(new File(directory))
}
- /**
- * Runs the streams set up in `ssc` on real clock until the expected max number of
- */
- def runStreamsWithRealClock[V: ClassManifest](
- ssc: StreamingContext,
- numBatches: Int,
- maxExpectedOutput: Int
- ): (Seq[Seq[V]], Long) = {
-
- System.clearProperty("spark.streaming.clock")
-
- assert(numBatches > 0, "Number of batches to run stream computation is zero")
- assert(maxExpectedOutput > 0, "Max expected outputs after " + numBatches + " is zero")
- logInfo("numBatches = " + numBatches + ", maxExpectedOutput = " + maxExpectedOutput)
-
- // Get the output buffer
- val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]]
- val output = outputStream.output
- val waitTime = (batchDuration.milliseconds * (numBatches.toDouble + 0.5)).toLong
- val startTime = System.currentTimeMillis()
-
- try {
- // Start computation
- ssc.start()
-
- // Wait until expected number of output items have been generated
- while (output.size < maxExpectedOutput && System.currentTimeMillis() - startTime < waitTime && !FailureSuite.failed) {
- logInfo("output.size = " + output.size + ", maxExpectedOutput = " + maxExpectedOutput)
- Thread.sleep(100)
- }
- } catch {
- case e: Exception => logInfo("Exception while running streams: " + e)
- } finally {
- ssc.stop()
- }
- val timeTaken = System.currentTimeMillis() - startTime
- logInfo("" + output.size + " sets of output generated in " + timeTaken + " ms")
- (output, timeTaken)
+ test("multiple failures with map") {
+ MasterFailureTest.testMap(directory, numBatches, batchDuration)
}
-
-}
-
-object FailureSuite {
- var failed = false
- var failureCount = 0
-
- def reset() {
- failed = false
- failureCount = 0
+ test("multiple failures with updateStateByKey") {
+ MasterFailureTest.testUpdateStateByKey(directory, numBatches, batchDuration)
}
}
-class KillingThread(ssc: StreamingContext, maxKillWaitTime: Int) extends Thread with Logging {
- initLogging()
-
- override def run() {
- var minKillWaitTime = if (FailureSuite.failureCount == 0) 3000 else 1000 // to allow the first checkpoint
- val killWaitTime = minKillWaitTime + Random.nextInt(maxKillWaitTime)
- logInfo("Kill wait time = " + killWaitTime)
- Thread.sleep(killWaitTime.toLong)
- logInfo(
- "\n---------------------------------------\n" +
- "Killing streaming context after " + killWaitTime + " ms" +
- "\n---------------------------------------\n"
- )
- if (ssc != null) ssc.stop()
- FailureSuite.failed = true
- FailureSuite.failureCount += 1
- }
-}
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index c442210004..c9f941c5b8 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -1,5 +1,11 @@
package spark.streaming
+import akka.actor.Actor
+import akka.actor.IO
+import akka.actor.IOManager
+import akka.actor.Props
+import akka.util.ByteString
+
import dstream.SparkFlumeEvent
import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket}
import java.io.{File, BufferedWriter, OutputStreamWriter}
@@ -7,6 +13,7 @@ import java.util.concurrent.{TimeUnit, ArrayBlockingQueue}
import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import util.ManualClock
import spark.storage.StorageLevel
+import spark.streaming.receivers.Receiver
import spark.Logging
import scala.util.Random
import org.apache.commons.io.FileUtils
@@ -25,6 +32,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+ val testPort = 9999
+
override def checkpointDir = "checkpoint"
after {
@@ -35,13 +44,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
test("network input stream") {
// Start the server
- val testPort = 9999
val testServer = new TestServer(testPort)
testServer.start()
// Set up the streaming context and input streams
val ssc = new StreamingContext(master, framework, batchDuration)
- val networkStream = ssc.networkTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
+ val networkStream = ssc.socketTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]]
val outputStream = new TestOutputStream(networkStream, outputBuffer)
def output = outputBuffer.flatMap(x => x)
@@ -95,7 +103,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq(1, 2, 3, 4, 5)
-
+ Thread.sleep(1000)
val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", 33333));
val client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol], transceiver);
@@ -133,26 +141,29 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
test("file input stream") {
+ // Disable manual clock as FileInputDStream does not work with manual clock
+ System.clearProperty("spark.streaming.clock")
+
// Set up the streaming context and input streams
val testDir = Files.createTempDir()
val ssc = new StreamingContext(master, framework, batchDuration)
- val filestream = ssc.textFileStream(testDir.toString)
+ val fileStream = ssc.textFileStream(testDir.toString)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
def output = outputBuffer.flatMap(x => x)
- val outputStream = new TestOutputStream(filestream, outputBuffer)
+ val outputStream = new TestOutputStream(fileStream, outputBuffer)
ssc.registerOutputStream(outputStream)
ssc.start()
// Create files in the temporary directory so that Spark Streaming can read data from it
- val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq(1, 2, 3, 4, 5)
val expectedOutput = input.map(_.toString)
Thread.sleep(1000)
for (i <- 0 until input.size) {
- FileUtils.writeStringToFile(new File(testDir, i.toString), input(i).toString + "\n")
- Thread.sleep(500)
- clock.addToTime(batchDuration.milliseconds)
- //Thread.sleep(100)
+ val file = new File(testDir, i.toString)
+ FileUtils.writeStringToFile(file, input(i).toString + "\n")
+ logInfo("Created file " + file)
+ Thread.sleep(batchDuration.milliseconds)
+ Thread.sleep(1000)
}
val startTime = System.currentTimeMillis()
Thread.sleep(1000)
@@ -171,16 +182,68 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Verify whether all the elements received are as expected
// (whether the elements were received one in each interval is not verified)
+ assert(output.toList === expectedOutput.toList)
+
+ FileUtils.deleteDirectory(testDir)
+
+ // Enable manual clock back again for other tests
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+ }
+
+
+ test("actor input stream") {
+ // Start the server
+ val port = testPort
+ val testServer = new TestServer(port)
+ testServer.start()
+
+ // Set up the streaming context and input streams
+ val ssc = new StreamingContext(master, framework, batchDuration)
+ val networkStream = ssc.actorStream[String](Props(new TestActor(port)), "TestActor",
+ StorageLevel.MEMORY_AND_DISK) //Had to pass the local value of port to prevent from closing over entire scope
+ val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
+ val outputStream = new TestOutputStream(networkStream, outputBuffer)
+ def output = outputBuffer.flatMap(x => x)
+ ssc.registerOutputStream(outputStream)
+ ssc.start()
+
+ // Feed data to the server to send to the network receiver
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ val input = 1 to 9
+ val expectedOutput = input.map(x => x.toString)
+ Thread.sleep(1000)
+ for (i <- 0 until input.size) {
+ testServer.send(input(i).toString)
+ Thread.sleep(500)
+ clock.addToTime(batchDuration.milliseconds)
+ }
+ Thread.sleep(1000)
+ logInfo("Stopping server")
+ testServer.stop()
+ logInfo("Stopping context")
+ ssc.stop()
+
+ // Verify whether data received was as expected
+ logInfo("--------------------------------")
+ logInfo("output.size = " + outputBuffer.size)
+ logInfo("output")
+ outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ logInfo("expected output.size = " + expectedOutput.size)
+ logInfo("expected output")
+ expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ logInfo("--------------------------------")
+
+ // Verify whether all the elements received are as expected
+ // (whether the elements were received one in each interval is not verified)
assert(output.size === expectedOutput.size)
for (i <- 0 until output.size) {
- assert(output(i).size === 1)
- assert(output(i).head.toString === expectedOutput(i))
+ assert(output(i) === expectedOutput(i))
}
- FileUtils.deleteDirectory(testDir)
}
}
+/** This is server to test the network input stream */
class TestServer(port: Int) extends Logging {
val queue = new ArrayBlockingQueue[String](100)
@@ -239,3 +302,15 @@ object TestServer {
}
}
}
+
+class TestActor(port: Int) extends Actor with Receiver {
+
+ def bytesToString(byteString: ByteString) = byteString.utf8String
+
+ override def preStart = IOManager(context.system).connect(new InetSocketAddress(port))
+
+ def receive = {
+ case IO.Read(socket, bytes) =>
+ pushBlock(bytesToString(bytes))
+ }
+}
diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
index c2733831b2..ad6aa79d10 100644
--- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
@@ -63,20 +63,25 @@ class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBu
*/
trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
+ // Name of the framework for Spark context
def framework = "TestSuiteBase"
+ // Master for Spark context
def master = "local[2]"
+ // Batch duration
def batchDuration = Seconds(1)
+ // Directory where the checkpoint data will be saved
def checkpointDir = "checkpoint"
- def checkpointInterval = batchDuration
-
+ // Number of partitions of the input parallel collections created for testing
def numInputPartitions = 2
+ // Maximum time to wait before the test times out
def maxWaitTimeMillis = 10000
+ // Whether to actually wait in real time before changing manual clock
def actuallyWait = false
/**
@@ -91,7 +96,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
// Create StreamingContext
val ssc = new StreamingContext(master, framework, batchDuration)
if (checkpointDir != null) {
- ssc.checkpoint(checkpointDir, checkpointInterval)
+ ssc.checkpoint(checkpointDir)
}
// Setup the stream computation
@@ -116,7 +121,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
// Create StreamingContext
val ssc = new StreamingContext(master, framework, batchDuration)
if (checkpointDir != null) {
- ssc.checkpoint(checkpointDir, checkpointInterval)
+ ssc.checkpoint(checkpointDir)
}
// Setup the stream computation
@@ -140,9 +145,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
numBatches: Int,
numExpectedOutput: Int
): Seq[Seq[V]] = {
-
- System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
-
assert(numBatches > 0, "Number of batches to run stream computation is zero")
assert(numExpectedOutput > 0, "Number of expected outputs after " + numBatches + " is zero")
logInfo("numBatches = " + numBatches + ", numExpectedOutput = " + numExpectedOutput)
@@ -186,7 +188,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
} finally {
ssc.stop()
}
-
output
}
diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
index cd9608df53..1b66f3bda2 100644
--- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
@@ -5,6 +5,8 @@ import collection.mutable.ArrayBuffer
class WindowOperationsSuite extends TestSuiteBase {
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+
override def framework = "WindowOperationsSuite"
override def maxWaitTimeMillis = 20000
@@ -82,12 +84,9 @@ class WindowOperationsSuite extends TestSuiteBase {
)
/*
- The output of the reduceByKeyAndWindow with inverse reduce function is
- different from the naive reduceByKeyAndWindow. Even if the count of a
- particular key is 0, the key does not get eliminated from the RDDs of
- ReducedWindowedDStream. This causes the number of keys in these RDDs to
- increase forever. A more generalized version that allows elimination of
- keys should be considered.
+ The output of the reduceByKeyAndWindow with inverse function but without a filter
+ function will be different from the naive reduceByKeyAndWindow, as no keys get
+ eliminated from the ReducedWindowedDStream even if the value of a key becomes 0.
*/
val bigReduceInvOutput = Seq(
@@ -175,31 +174,31 @@ class WindowOperationsSuite extends TestSuiteBase {
// Testing reduceByKeyAndWindow (with invertible reduce function)
- testReduceByKeyAndWindowInv(
+ testReduceByKeyAndWindowWithInverse(
"basic reduction",
Seq(Seq(("a", 1), ("a", 3)) ),
Seq(Seq(("a", 4)) )
)
- testReduceByKeyAndWindowInv(
+ testReduceByKeyAndWindowWithInverse(
"key already in window and new value added into window",
Seq( Seq(("a", 1)), Seq(("a", 1)) ),
Seq( Seq(("a", 1)), Seq(("a", 2)) )
)
- testReduceByKeyAndWindowInv(
+ testReduceByKeyAndWindowWithInverse(
"new key added into window",
Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 1)) ),
Seq( Seq(("a", 1)), Seq(("a", 2), ("b", 1)) )
)
- testReduceByKeyAndWindowInv(
+ testReduceByKeyAndWindowWithInverse(
"key removed from window",
Seq( Seq(("a", 1)), Seq(("a", 1)), Seq(), Seq() ),
Seq( Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 1)), Seq(("a", 0)) )
)
- testReduceByKeyAndWindowInv(
+ testReduceByKeyAndWindowWithInverse(
"larger slide time",
largerSlideInput,
largerSlideReduceOutput,
@@ -207,7 +206,9 @@ class WindowOperationsSuite extends TestSuiteBase {
Seconds(2)
)
- testReduceByKeyAndWindowInv("big test", bigInput, bigReduceInvOutput)
+ testReduceByKeyAndWindowWithInverse("big test", bigInput, bigReduceInvOutput)
+
+ testReduceByKeyAndWindowWithFilteredInverse("big test", bigInput, bigReduceOutput)
test("groupByKeyAndWindow") {
val input = bigInput
@@ -235,14 +236,14 @@ class WindowOperationsSuite extends TestSuiteBase {
testOperation(input, operation, expectedOutput, numBatches, true)
}
- test("countByKeyAndWindow") {
- val input = Seq(Seq(("a", 1)), Seq(("b", 1), ("b", 2)), Seq(("a", 10), ("b", 20)))
+ test("countByValueAndWindow") {
+ val input = Seq(Seq("a"), Seq("b", "b"), Seq("a", "b"))
val expectedOutput = Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 2)), Seq(("a", 1), ("b", 3)))
val windowDuration = Seconds(2)
val slideDuration = Seconds(1)
val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
- val operation = (s: DStream[(String, Int)]) => {
- s.countByKeyAndWindow(windowDuration, slideDuration).map(x => (x._1, x._2.toInt))
+ val operation = (s: DStream[String]) => {
+ s.countByValueAndWindow(windowDuration, slideDuration).map(x => (x._1, x._2.toInt))
}
testOperation(input, operation, expectedOutput, numBatches, true)
}
@@ -272,29 +273,50 @@ class WindowOperationsSuite extends TestSuiteBase {
slideDuration: Duration = Seconds(1)
) {
test("reduceByKeyAndWindow - " + name) {
+ logInfo("reduceByKeyAndWindow - " + name)
val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
- s.reduceByKeyAndWindow(_ + _, windowDuration, slideDuration).persist()
+ s.reduceByKeyAndWindow((x: Int, y: Int) => x + y, windowDuration, slideDuration)
}
testOperation(input, operation, expectedOutput, numBatches, true)
}
}
- def testReduceByKeyAndWindowInv(
+ def testReduceByKeyAndWindowWithInverse(
name: String,
input: Seq[Seq[(String, Int)]],
expectedOutput: Seq[Seq[(String, Int)]],
windowDuration: Duration = Seconds(2),
slideDuration: Duration = Seconds(1)
) {
- test("reduceByKeyAndWindowInv - " + name) {
+ test("reduceByKeyAndWindow with inverse function - " + name) {
+ logInfo("reduceByKeyAndWindow with inverse function - " + name)
val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
s.reduceByKeyAndWindow(_ + _, _ - _, windowDuration, slideDuration)
- .persist()
.checkpoint(Seconds(100)) // Large value to avoid effect of RDD checkpointing
}
testOperation(input, operation, expectedOutput, numBatches, true)
}
}
+
+ def testReduceByKeyAndWindowWithFilteredInverse(
+ name: String,
+ input: Seq[Seq[(String, Int)]],
+ expectedOutput: Seq[Seq[(String, Int)]],
+ windowDuration: Duration = Seconds(2),
+ slideDuration: Duration = Seconds(1)
+ ) {
+ test("reduceByKeyAndWindow with inverse and filter functions - " + name) {
+ logInfo("reduceByKeyAndWindow with inverse and filter functions - " + name)
+ val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
+ val filterFunc = (p: (String, Int)) => p._2 != 0
+ val operation = (s: DStream[(String, Int)]) => {
+ s.reduceByKeyAndWindow(_ + _, _ - _, windowDuration, slideDuration, filterFunc = filterFunc)
+ .persist()
+ .checkpoint(Seconds(100)) // Large value to avoid effect of RDD checkpointing
+ }
+ testOperation(input, operation, expectedOutput, numBatches, true)
+ }
+ }
}