aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bagel/src/test/resources/log4j.properties4
-rwxr-xr-xconf/streaming-env.sh.template22
-rw-r--r--core/src/main/scala/spark/CacheTracker.scala4
-rw-r--r--core/src/main/scala/spark/MapOutputTracker.scala4
-rw-r--r--core/src/main/scala/spark/RDDCheckpointData.scala14
-rw-r--r--core/src/main/scala/spark/rdd/CheckpointRDD.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala6
-rw-r--r--core/src/main/scala/spark/scheduler/ResultTask.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala2
-rw-r--r--core/src/main/scala/spark/util/TimeStampedHashMap.scala7
-rw-r--r--core/src/main/scala/spark/util/TimeStampedHashSet.scala5
-rw-r--r--core/src/test/resources/log4j.properties4
-rw-r--r--docs/streaming-programming-guide.md14
-rw-r--r--examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala (renamed from streaming/src/main/scala/spark/streaming/examples/FlumeEventCount.scala)2
-rw-r--r--examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala36
-rw-r--r--examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala (renamed from streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala)0
-rw-r--r--examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala (renamed from streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala)17
-rw-r--r--examples/src/main/scala/spark/streaming/examples/QueueStream.scala (renamed from streaming/src/main/scala/spark/streaming/examples/QueueStream.scala)0
-rw-r--r--examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala46
-rw-r--r--examples/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala (renamed from streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala)0
-rw-r--r--examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala (renamed from streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala)0
-rw-r--r--project/SparkBuild.scala2
-rw-r--r--repl/src/test/resources/log4j.properties4
-rwxr-xr-xrun4
-rw-r--r--sentences.txt3
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala38
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala16
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/FileStream.scala46
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala75
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala33
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala49
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala25
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala43
-rw-r--r--streaming/src/test/resources/log4j.properties4
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala2
35 files changed, 181 insertions, 356 deletions
diff --git a/bagel/src/test/resources/log4j.properties b/bagel/src/test/resources/log4j.properties
index 4c99e450bc..83d05cab2f 100644
--- a/bagel/src/test/resources/log4j.properties
+++ b/bagel/src/test/resources/log4j.properties
@@ -1,8 +1,8 @@
-# Set everything to be logged to the console
+# Set everything to be logged to the file bagel/target/unit-tests.log
log4j.rootCategory=INFO, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
-log4j.appender.file.file=spark-tests.log
+log4j.appender.file.file=bagel/target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
diff --git a/conf/streaming-env.sh.template b/conf/streaming-env.sh.template
deleted file mode 100755
index 1ea9ba5541..0000000000
--- a/conf/streaming-env.sh.template
+++ /dev/null
@@ -1,22 +0,0 @@
-#!/usr/bin/env bash
-
-# This file contains a few additional setting that are useful for
-# running streaming jobs in Spark. Copy this file as streaming-env.sh .
-# Note that this shell script will be read after spark-env.sh, so settings
-# in this file may override similar settings (if present) in spark-env.sh .
-
-
-# Using concurrent GC is strongly recommended as it can significantly
-# reduce GC related pauses.
-
-SPARK_JAVA_OPTS+=" -XX:+UseConcMarkSweepGC"
-
-# Using Kryo serialization can improve serialization performance
-# and therefore the throughput of the Spark Streaming programs. However,
-# using Kryo serialization with custom classes may required you to
-# register the classes with Kryo. Refer to the Spark documentation
-# for more details.
-
-# SPARK_JAVA_OPTS+=" -Dspark.serializer=spark.KryoSerializer"
-
-export SPARK_JAVA_OPTS
diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala
index 7d320c4fe5..86ad737583 100644
--- a/core/src/main/scala/spark/CacheTracker.scala
+++ b/core/src/main/scala/spark/CacheTracker.scala
@@ -39,7 +39,7 @@ private[spark] class CacheTrackerActor extends Actor with Logging {
private val slaveCapacity = new HashMap[String, Long]
private val slaveUsage = new HashMap[String, Long]
- private val metadataCleaner = new MetadataCleaner("CacheTrackerActor", locs.cleanup)
+ private val metadataCleaner = new MetadataCleaner("CacheTrackerActor", locs.clearOldValues)
private def getCacheUsage(host: String): Long = slaveUsage.getOrElse(host, 0L)
private def getCacheCapacity(host: String): Long = slaveCapacity.getOrElse(host, 0L)
@@ -120,7 +120,7 @@ private[spark] class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, b
// Remembers which splits are currently being loaded (on worker nodes)
val loading = new HashSet[String]
- val metadataCleaner = new MetadataCleaner("CacheTracker", registeredRddIds.cleanup)
+ val metadataCleaner = new MetadataCleaner("CacheTracker", registeredRddIds.clearOldValues)
// Send a message to the trackerActor and get its result within a default timeout, or
// throw a SparkException if this fails.
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
index 5ebdba0fc8..a2fa2d1ea7 100644
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -178,8 +178,8 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
}
def cleanup(cleanupTime: Long) {
- mapStatuses.cleanup(cleanupTime)
- cachedSerializedStatuses.cleanup(cleanupTime)
+ mapStatuses.clearOldValues(cleanupTime)
+ cachedSerializedStatuses.clearOldValues(cleanupTime)
}
def stop() {
diff --git a/core/src/main/scala/spark/RDDCheckpointData.scala b/core/src/main/scala/spark/RDDCheckpointData.scala
index 7af830940f..d845a522e4 100644
--- a/core/src/main/scala/spark/RDDCheckpointData.scala
+++ b/core/src/main/scala/spark/RDDCheckpointData.scala
@@ -14,15 +14,23 @@ private[spark] object CheckpointState extends Enumeration {
}
/**
- * This class contains all the information of the regarding RDD checkpointing.
+ * This class contains all the information related to RDD checkpointing. Each instance of this class
+ * is associated with a RDD. It manages process of checkpointing of the associated RDD, as well as,
+ * manages the post-checkpoint state by providing the updated splits, iterator and preferred locations
+ * of the checkpointed RDD.
*/
private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T])
extends Logging with Serializable {
import CheckpointState._
+ // The checkpoint state of the associated RDD.
var cpState = Initialized
+
+ // The file to which the associated RDD has been checkpointed to
@transient var cpFile: Option[String] = None
+
+ // The CheckpointRDD created from the checkpoint file, that is, the new parent the associated RDD.
@transient var cpRDD: Option[RDD[T]] = None
// Mark the RDD for checkpointing
@@ -65,7 +73,7 @@ extends Logging with Serializable {
cpRDD = Some(newRDD)
rdd.changeDependencies(newRDD)
cpState = Checkpointed
- RDDCheckpointData.checkpointCompleted()
+ RDDCheckpointData.clearTaskCaches()
logInfo("Done checkpointing RDD " + rdd.id + ", new parent is RDD " + newRDD.id)
}
}
@@ -90,7 +98,7 @@ extends Logging with Serializable {
}
private[spark] object RDDCheckpointData {
- def checkpointCompleted() {
+ def clearTaskCaches() {
ShuffleMapTask.clearCache()
ResultTask.clearCache()
}
diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala
index 1a88d402c3..86c63ca2f4 100644
--- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/spark/rdd/CheckpointRDD.scala
@@ -13,6 +13,10 @@ private[spark] class CheckpointRDDSplit(idx: Int, val splitFile: String) extends
override val index: Int = idx
}
+/**
+ * This RDD represents a RDD checkpoint file (similar to HadoopRDD).
+ */
+private[spark]
class CheckpointRDD[T: ClassManifest](sc: SparkContext, checkpointPath: String)
extends RDD[T](sc, Nil) {
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 9387ba19a3..59f2099e91 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -599,15 +599,15 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
def cleanup(cleanupTime: Long) {
var sizeBefore = idToStage.size
- idToStage.cleanup(cleanupTime)
+ idToStage.clearOldValues(cleanupTime)
logInfo("idToStage " + sizeBefore + " --> " + idToStage.size)
sizeBefore = shuffleToMapStage.size
- shuffleToMapStage.cleanup(cleanupTime)
+ shuffleToMapStage.clearOldValues(cleanupTime)
logInfo("shuffleToMapStage " + sizeBefore + " --> " + shuffleToMapStage.size)
sizeBefore = pendingTasks.size
- pendingTasks.cleanup(cleanupTime)
+ pendingTasks.clearOldValues(cleanupTime)
logInfo("pendingTasks " + sizeBefore + " --> " + pendingTasks.size)
}
diff --git a/core/src/main/scala/spark/scheduler/ResultTask.scala b/core/src/main/scala/spark/scheduler/ResultTask.scala
index 7ec6564105..74a63c1af1 100644
--- a/core/src/main/scala/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/spark/scheduler/ResultTask.scala
@@ -12,7 +12,7 @@ private[spark] object ResultTask {
// expensive on the master node if it needs to launch thousands of tasks.
val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]]
- val metadataCleaner = new MetadataCleaner("ResultTask", serializedInfoCache.cleanup)
+ val metadataCleaner = new MetadataCleaner("ResultTask", serializedInfoCache.clearOldValues)
def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) => _): Array[Byte] = {
synchronized {
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index feb63abb61..19f5328eee 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -23,7 +23,7 @@ private[spark] object ShuffleMapTask {
// expensive on the master node if it needs to launch thousands of tasks.
val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]]
- val metadataCleaner = new MetadataCleaner("ShuffleMapTask", serializedInfoCache.cleanup)
+ val metadataCleaner = new MetadataCleaner("ShuffleMapTask", serializedInfoCache.clearOldValues)
def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_]): Array[Byte] = {
synchronized {
diff --git a/core/src/main/scala/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/spark/util/TimeStampedHashMap.scala
index 7e785182ea..bb7c5c01c8 100644
--- a/core/src/main/scala/spark/util/TimeStampedHashMap.scala
+++ b/core/src/main/scala/spark/util/TimeStampedHashMap.scala
@@ -7,7 +7,7 @@ import scala.collection.mutable.Map
/**
* This is a custom implementation of scala.collection.mutable.Map which stores the insertion
* time stamp along with each key-value pair. Key-value pairs that are older than a particular
- * threshold time can them be removed using the cleanup method. This is intended to be a drop-in
+ * threshold time can them be removed using the clearOldValues method. This is intended to be a drop-in
* replacement of scala.collection.mutable.HashMap.
*/
class TimeStampedHashMap[A, B] extends Map[A, B]() with spark.Logging {
@@ -74,7 +74,10 @@ class TimeStampedHashMap[A, B] extends Map[A, B]() with spark.Logging {
}
}
- def cleanup(threshTime: Long) {
+ /**
+ * Removes old key-value pairs that have timestamp earlier than `threshTime`
+ */
+ def clearOldValues(threshTime: Long) {
val iterator = internalMap.entrySet().iterator()
while(iterator.hasNext) {
val entry = iterator.next()
diff --git a/core/src/main/scala/spark/util/TimeStampedHashSet.scala b/core/src/main/scala/spark/util/TimeStampedHashSet.scala
index 539dd75844..5f1cc93752 100644
--- a/core/src/main/scala/spark/util/TimeStampedHashSet.scala
+++ b/core/src/main/scala/spark/util/TimeStampedHashSet.scala
@@ -52,7 +52,10 @@ class TimeStampedHashSet[A] extends Set[A] {
}
}
- def cleanup(threshTime: Long) {
+ /**
+ * Removes old values that have timestamp earlier than `threshTime`
+ */
+ def clearOldValues(threshTime: Long) {
val iterator = internalMap.entrySet().iterator()
while(iterator.hasNext) {
val entry = iterator.next()
diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties
index 5ed388e91b..6ec89c0184 100644
--- a/core/src/test/resources/log4j.properties
+++ b/core/src/test/resources/log4j.properties
@@ -1,8 +1,8 @@
-# Set everything to be logged to the file spark-tests.log
+# Set everything to be logged to the file core/target/unit-tests.log
log4j.rootCategory=INFO, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
-log4j.appender.file.file=spark-tests.log
+log4j.appender.file.file=core/target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index fc2ea2ef79..05a88ce7bd 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -187,8 +187,8 @@ Conversely, the computation can be stopped by using
ssc.stop()
{% endhighlight %}
-# Example - WordCountNetwork.scala
-A good example to start off is the spark.streaming.examples.WordCountNetwork. This example counts the words received from a network server every second. Given below is the relevant sections of the source code. You can find the full source code in <Spark repo>/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala.
+# Example - NetworkWordCount.scala
+A good example to start off is the spark.streaming.examples.NetworkWordCount. This example counts the words received from a network server every second. Given below is the relevant sections of the source code. You can find the full source code in <Spark repo>/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala.
{% highlight scala %}
import spark.streaming.{Seconds, StreamingContext}
@@ -196,7 +196,7 @@ import spark.streaming.StreamingContext._
...
// Create the context and set up a network input stream to receive from a host:port
-val ssc = new StreamingContext(args(0), "WordCountNetwork", Seconds(1))
+val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1))
val lines = ssc.networkTextStream(args(1), args(2).toInt)
// Split the lines into words, count them, and print some of the counts on the master
@@ -214,13 +214,13 @@ To run this example on your local machine, you need to first run a Netcat server
$ nc -lk 9999
{% endhighlight %}
-Then, in a different terminal, you can start WordCountNetwork by using
+Then, in a different terminal, you can start NetworkWordCount by using
{% highlight bash %}
-$ ./run spark.streaming.examples.WordCountNetwork local[2] localhost 9999
+$ ./run spark.streaming.examples.NetworkWordCount local[2] localhost 9999
{% endhighlight %}
-This will make WordCountNetwork connect to the netcat server. Any lines typed in the terminal running the netcat server will be counted and printed on screen.
+This will make NetworkWordCount connect to the netcat server. Any lines typed in the terminal running the netcat server will be counted and printed on screen.
<table>
<td>
@@ -240,7 +240,7 @@ hello world
</td>
<td>
{% highlight bash %}
-# TERMINAL 2: RUNNING WordCountNetwork
+# TERMINAL 2: RUNNING NetworkWordCount
...
2012-12-31 18:47:10,446 INFO SparkContext: Job finished: run at ThreadPoolExecutor.java:886, took 0.038817 s
-------------------------------------------
diff --git a/streaming/src/main/scala/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala
index e60ce483a3..461929fba2 100644
--- a/streaming/src/main/scala/spark/streaming/examples/FlumeEventCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala
@@ -5,7 +5,7 @@ import spark.storage.StorageLevel
import spark.streaming._
/**
- * Produce a streaming count of events received from Flume.
+ * Produces a count of events received from Flume.
*
* This should be used in conjunction with an AvroSink in Flume. It will start
* an Avro server on at the request host:port address and listen for requests.
diff --git a/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala b/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala
new file mode 100644
index 0000000000..8530f5c175
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala
@@ -0,0 +1,36 @@
+package spark.streaming.examples
+
+import spark.streaming.{Seconds, StreamingContext}
+import spark.streaming.StreamingContext._
+
+
+/**
+ * Counts words in new text files created in the given directory
+ * Usage: HdfsWordCount <master> <directory>
+ * <master> is the Spark master URL.
+ * <directory> is the directory that Spark Streaming will use to find and read new text files.
+ *
+ * To run this on your local machine on directory `localdir`, run this example
+ * `$ ./run spark.streaming.examples.HdfsWordCount local[2] localdir`
+ * Then create a text file in `localdir` and the words in the file will get counted.
+ */
+object HdfsWordCount {
+ def main(args: Array[String]) {
+ if (args.length < 2) {
+ System.err.println("Usage: HdfsWordCount <master> <directory>")
+ System.exit(1)
+ }
+
+ // Create the context
+ val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2))
+
+ // Create the FileInputDStream on the directory and use the
+ // stream to count words in new files created
+ val lines = ssc.textFileStream(args(1))
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+ wordCounts.print()
+ ssc.start()
+ }
+}
+
diff --git a/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
index fe55db6e2c..fe55db6e2c 100644
--- a/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala
index eadda60563..43c01d5db2 100644
--- a/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala
+++ b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala
@@ -3,16 +3,27 @@ package spark.streaming.examples
import spark.streaming.{Seconds, StreamingContext}
import spark.streaming.StreamingContext._
-object WordCountNetwork {
+/**
+ * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
+ * Usage: NetworkWordCount <master> <hostname> <port>
+ * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ * `$ nc -lk 9999`
+ * and then run the example
+ * `$ ./run spark.streaming.examples.NetworkWordCount local[2] localhost 9999`
+ */
+object NetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
- System.err.println("Usage: WordCountNetwork <master> <hostname> <port>\n" +
+ System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" +
"In local mode, <master> should be 'local[n]' with n > 1")
System.exit(1)
}
// Create the context and set the batch size
- val ssc = new StreamingContext(args(0), "WordCountNetwork", Seconds(1))
+ val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1))
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
diff --git a/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala b/examples/src/main/scala/spark/streaming/examples/QueueStream.scala
index 2a265d021d..2a265d021d 100644
--- a/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala
+++ b/examples/src/main/scala/spark/streaming/examples/QueueStream.scala
diff --git a/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala b/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala
new file mode 100644
index 0000000000..2eec777c54
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala
@@ -0,0 +1,46 @@
+package spark.streaming.examples
+
+import spark.util.IntParam
+import spark.storage.StorageLevel
+
+import spark.streaming._
+import spark.streaming.util.RawTextHelper
+
+/**
+ * Receives text from multiple rawNetworkStreams and counts how many '\n' delimited
+ * lines have the word 'the' in them. This is useful for benchmarking purposes. This
+ * will only work with spark.streaming.util.RawTextSender running on all worker nodes
+ * and with Spark using Kryo serialization (set Java property "spark.serializer" to
+ * "spark.KryoSerializer").
+ * Usage: RawNetworkGrep <master> <numStreams> <host> <port> <batchMillis>
+ * <master> is the Spark master URL
+ * <numStream> is the number rawNetworkStreams, which should be same as number
+ * of work nodes in the cluster
+ * <host> is "localhost".
+ * <port> is the port on which RawTextSender is running in the worker nodes.
+ * <batchMillise> is the Spark Streaming batch duration in milliseconds.
+ */
+
+object RawNetworkGrep {
+ def main(args: Array[String]) {
+ if (args.length != 5) {
+ System.err.println("Usage: RawNetworkGrep <master> <numStreams> <host> <port> <batchMillis>")
+ System.exit(1)
+ }
+
+ val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args
+
+ // Create the context
+ val ssc = new StreamingContext(master, "RawNetworkGrep", Milliseconds(batchMillis))
+
+ // Warm up the JVMs on master and slave for JIT compilation to kick in
+ RawTextHelper.warmUp(ssc.sc)
+
+ val rawStreams = (1 to numStreams).map(_ =>
+ ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray
+ val union = ssc.union(rawStreams)
+ union.filter(_.contains("the")).count().foreach(r =>
+ println("Grep count: " + r.collect().mkString))
+ ssc.start()
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala
index 4c6e08bc74..4c6e08bc74 100644
--- a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala
+++ b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala
diff --git a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
index a191321d91..a191321d91 100644
--- a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
+++ b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index a3f901a081..6ba3026bcc 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -23,7 +23,7 @@ object SparkBuild extends Build {
lazy val repl = Project("repl", file("repl"), settings = replSettings) dependsOn (core)
- lazy val examples = Project("examples", file("examples"), settings = examplesSettings) dependsOn (core)
+ lazy val examples = Project("examples", file("examples"), settings = examplesSettings) dependsOn (core) dependsOn (streaming)
lazy val bagel = Project("bagel", file("bagel"), settings = bagelSettings) dependsOn (core)
diff --git a/repl/src/test/resources/log4j.properties b/repl/src/test/resources/log4j.properties
index 4c99e450bc..cfb1a390e6 100644
--- a/repl/src/test/resources/log4j.properties
+++ b/repl/src/test/resources/log4j.properties
@@ -1,8 +1,8 @@
-# Set everything to be logged to the console
+# Set everything to be logged to the repl/target/unit-tests.log
log4j.rootCategory=INFO, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
-log4j.appender.file.file=spark-tests.log
+log4j.appender.file.file=repl/target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
diff --git a/run b/run
index 27506c33e2..2f61cb2a87 100755
--- a/run
+++ b/run
@@ -13,10 +13,6 @@ if [ -e $FWDIR/conf/spark-env.sh ] ; then
. $FWDIR/conf/spark-env.sh
fi
-if [ -e $FWDIR/conf/streaming-env.sh ] ; then
- . $FWDIR/conf/streaming-env.sh
-fi
-
if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then
if [ `command -v scala` ]; then
RUNNER="scala"
diff --git a/sentences.txt b/sentences.txt
deleted file mode 100644
index fedf96c66e..0000000000
--- a/sentences.txt
+++ /dev/null
@@ -1,3 +0,0 @@
-Hello world!
-What's up?
-There is no cow level
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 7256e41af9..215246ba2e 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -154,7 +154,7 @@ class StreamingContext private (
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2
): DStream[T] = {
val inputStream = new KafkaInputDStream[T](this, hostname, port, groupId, topics, initialOffsets, storageLevel)
- graph.addInputStream(inputStream)
+ registerInputStream(inputStream)
inputStream
}
@@ -192,7 +192,7 @@ class StreamingContext private (
storageLevel: StorageLevel
): DStream[T] = {
val inputStream = new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
- graph.addInputStream(inputStream)
+ registerInputStream(inputStream)
inputStream
}
@@ -208,7 +208,7 @@ class StreamingContext private (
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream(this, hostname, port, storageLevel)
- graph.addInputStream(inputStream)
+ registerInputStream(inputStream)
inputStream
}
@@ -228,13 +228,14 @@ class StreamingContext private (
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[T] = {
val inputStream = new RawInputDStream[T](this, hostname, port, storageLevel)
- graph.addInputStream(inputStream)
+ registerInputStream(inputStream)
inputStream
}
/**
* Creates a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
+ * File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
@@ -244,16 +245,37 @@ class StreamingContext private (
K: ClassManifest,
V: ClassManifest,
F <: NewInputFormat[K, V]: ClassManifest
- ](directory: String): DStream[(K, V)] = {
+ ] (directory: String): DStream[(K, V)] = {
val inputStream = new FileInputDStream[K, V, F](this, directory)
- graph.addInputStream(inputStream)
+ registerInputStream(inputStream)
+ inputStream
+ }
+
+ /**
+ * Creates a input stream that monitors a Hadoop-compatible filesystem
+ * for new files and reads them using the given key-value types and input format.
+ * @param directory HDFS directory to monitor for new file
+ * @param filter Function to filter paths to process
+ * @param newFilesOnly Should process only new files and ignore existing files in the directory
+ * @tparam K Key type for reading HDFS file
+ * @tparam V Value type for reading HDFS file
+ * @tparam F Input format for reading HDFS file
+ */
+ def fileStream[
+ K: ClassManifest,
+ V: ClassManifest,
+ F <: NewInputFormat[K, V]: ClassManifest
+ ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): DStream[(K, V)] = {
+ val inputStream = new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
+ registerInputStream(inputStream)
inputStream
}
+
/**
* Creates a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them as text files (using key as LongWritable, value
- * as Text and input format as TextInputFormat).
+ * as Text and input format as TextInputFormat). File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
*/
def textFileStream(directory: String): DStream[String] = {
@@ -274,7 +296,7 @@ class StreamingContext private (
defaultRDD: RDD[T] = null
): DStream[T] = {
val inputStream = new QueueInputDStream(this, queue, oneAtATime, defaultRDD)
- graph.addInputStream(inputStream)
+ registerInputStream(inputStream)
inputStream
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
index cf72095324..1e6ad84b44 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
@@ -14,7 +14,7 @@ private[streaming]
class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest](
@transient ssc_ : StreamingContext,
directory: String,
- filter: PathFilter = FileInputDStream.defaultPathFilter,
+ filter: Path => Boolean = FileInputDStream.defaultFilter,
newFilesOnly: Boolean = true)
extends InputDStream[(K, V)](ssc_) {
@@ -60,7 +60,7 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
val latestModTimeFiles = new HashSet[String]()
def accept(path: Path): Boolean = {
- if (!filter.accept(path)) {
+ if (!filter(path)) {
return false
} else {
val modTime = fs.getFileStatus(path).getModificationTime()
@@ -95,16 +95,8 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
}
}
+private[streaming]
object FileInputDStream {
- val defaultPathFilter = new PathFilter with Serializable {
- def accept(path: Path): Boolean = {
- val file = path.getName()
- if (file.startsWith(".") || file.endsWith("_tmp")) {
- return false
- } else {
- return true
- }
- }
- }
+ def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
}
diff --git a/streaming/src/main/scala/spark/streaming/examples/FileStream.scala b/streaming/src/main/scala/spark/streaming/examples/FileStream.scala
deleted file mode 100644
index 81938d30d4..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/FileStream.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-package spark.streaming.examples
-
-import spark.streaming.StreamingContext
-import spark.streaming.StreamingContext._
-import spark.streaming.Seconds
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.conf.Configuration
-
-
-object FileStream {
- def main(args: Array[String]) {
- if (args.length < 2) {
- System.err.println("Usage: FileStream <master> <new HDFS compatible directory>")
- System.exit(1)
- }
-
- // Create the context
- val ssc = new StreamingContext(args(0), "FileStream", Seconds(1))
-
- // Create the new directory
- val directory = new Path(args(1))
- val fs = directory.getFileSystem(new Configuration())
- if (fs.exists(directory)) throw new Exception("This directory already exists")
- fs.mkdirs(directory)
- fs.deleteOnExit(directory)
-
- // Create the FileInputDStream on the directory and use the
- // stream to count words in new files created
- val inputStream = ssc.textFileStream(directory.toString)
- val words = inputStream.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
- wordCounts.print()
- ssc.start()
-
- // Creating new files in the directory
- val text = "This is a text file"
- for (i <- 1 to 30) {
- ssc.sc.parallelize((1 to (i * 10)).map(_ => text), 10)
- .saveAsTextFile(new Path(directory, i.toString).toString)
- Thread.sleep(1000)
- }
- Thread.sleep(5000) // Waiting for the file to be processed
- ssc.stop()
- System.exit(0)
- }
-} \ No newline at end of file
diff --git a/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala b/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala
deleted file mode 100644
index b7bc15a1d5..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-package spark.streaming.examples
-
-import spark.streaming._
-import spark.streaming.StreamingContext._
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.conf.Configuration
-
-object FileStreamWithCheckpoint {
-
- def main(args: Array[String]) {
-
- if (args.size != 3) {
- println("FileStreamWithCheckpoint <master> <directory> <checkpoint dir>")
- println("FileStreamWithCheckpoint restart <directory> <checkpoint dir>")
- System.exit(-1)
- }
-
- val directory = new Path(args(1))
- val checkpointDir = args(2)
-
- val ssc: StreamingContext = {
-
- if (args(0) == "restart") {
-
- // Recreated streaming context from specified checkpoint file
- new StreamingContext(checkpointDir)
-
- } else {
-
- // Create directory if it does not exist
- val fs = directory.getFileSystem(new Configuration())
- if (!fs.exists(directory)) fs.mkdirs(directory)
-
- // Create new streaming context
- val ssc_ = new StreamingContext(args(0), "FileStreamWithCheckpoint", Seconds(1))
- ssc_.checkpoint(checkpointDir)
-
- // Setup the streaming computation
- val inputStream = ssc_.textFileStream(directory.toString)
- val words = inputStream.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
- wordCounts.print()
-
- ssc_
- }
- }
-
- // Start the stream computation
- startFileWritingThread(directory.toString)
- ssc.start()
- }
-
- def startFileWritingThread(directory: String) {
-
- val fs = new Path(directory).getFileSystem(new Configuration())
-
- val fileWritingThread = new Thread() {
- override def run() {
- val r = new scala.util.Random()
- val text = "This is a sample text file with a random number "
- while(true) {
- val number = r.nextInt()
- val file = new Path(directory, number.toString)
- val fos = fs.create(file)
- fos.writeChars(text + number)
- fos.close()
- println("Created text file " + file)
- Thread.sleep(1000)
- }
- }
- }
- fileWritingThread.start()
- }
-
-}
diff --git a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
deleted file mode 100644
index dfaaf03f03..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-package spark.streaming.examples
-
-import spark.util.IntParam
-import spark.storage.StorageLevel
-
-import spark.streaming._
-import spark.streaming.StreamingContext._
-import spark.streaming.util.RawTextHelper._
-
-object GrepRaw {
- def main(args: Array[String]) {
- if (args.length != 5) {
- System.err.println("Usage: GrepRaw <master> <numStreams> <host> <port> <batchMillis>")
- System.exit(1)
- }
-
- val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args
-
- // Create the context
- val ssc = new StreamingContext(master, "GrepRaw", Milliseconds(batchMillis))
-
- // Warm up the JVMs on master and slave for JIT compilation to kick in
- warmUp(ssc.sc)
-
-
- val rawStreams = (1 to numStreams).map(_ =>
- ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray
- val union = ssc.union(rawStreams)
- union.filter(_.contains("Alice")).count().foreach(r =>
- println("Grep count: " + r.collect().mkString))
- ssc.start()
- }
-}
diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
deleted file mode 100644
index 338834bc3c..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-package spark.streaming.examples
-
-import spark.storage.StorageLevel
-import spark.util.IntParam
-
-import spark.streaming._
-import spark.streaming.StreamingContext._
-import spark.streaming.util.RawTextHelper._
-
-import java.util.UUID
-
-object TopKWordCountRaw {
-
- def main(args: Array[String]) {
- if (args.length != 4) {
- System.err.println("Usage: WordCountRaw <master> <# streams> <port> <HDFS checkpoint directory> ")
- System.exit(1)
- }
-
- val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args
- val k = 10
-
- // Create the context, and set the checkpoint directory.
- // Checkpoint directory is necessary for achieving fault-tolerance, by saving counts
- // periodically to HDFS
- val ssc = new StreamingContext(master, "TopKWordCountRaw", Seconds(1))
- ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1))
-
- // Warm up the JVMs on master and slave for JIT compilation to kick in
- /*warmUp(ssc.sc)*/
-
- // Set up the raw network streams that will connect to localhost:port to raw test
- // senders on the slaves and generate top K words of last 30 seconds
- val lines = (1 to numStreams).map(_ => {
- ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2)
- })
- val union = ssc.union(lines)
- val counts = union.mapPartitions(splitAndCountPartitions)
- val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10)
- val partialTopKWindowedCounts = windowedCounts.mapPartitions(topK(_, k))
- partialTopKWindowedCounts.foreach(rdd => {
- val collectedCounts = rdd.collect
- println("Collected " + collectedCounts.size + " words from partial top words")
- println("Top " + k + " words are " + topK(collectedCounts.toIterator, k).mkString(","))
- })
-
- ssc.start()
- }
-}
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala
deleted file mode 100644
index 867a8f42c4..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-package spark.streaming.examples
-
-import spark.streaming.{Seconds, StreamingContext}
-import spark.streaming.StreamingContext._
-
-object WordCountHdfs {
- def main(args: Array[String]) {
- if (args.length < 2) {
- System.err.println("Usage: WordCountHdfs <master> <directory>")
- System.exit(1)
- }
-
- // Create the context
- val ssc = new StreamingContext(args(0), "WordCountHdfs", Seconds(2))
-
- // Create the FileInputDStream on the directory and use the
- // stream to count words in new files created
- val lines = ssc.textFileStream(args(1))
- val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
- wordCounts.print()
- ssc.start()
- }
-}
-
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
deleted file mode 100644
index d93335a8ce..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-package spark.streaming.examples
-
-import spark.storage.StorageLevel
-import spark.util.IntParam
-
-import spark.streaming._
-import spark.streaming.StreamingContext._
-import spark.streaming.util.RawTextHelper._
-
-import java.util.UUID
-
-object WordCountRaw {
-
- def main(args: Array[String]) {
- if (args.length != 4) {
- System.err.println("Usage: WordCountRaw <master> <# streams> <port> <HDFS checkpoint directory> ")
- System.exit(1)
- }
-
- val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args
-
- // Create the context, and set the checkpoint directory.
- // Checkpoint directory is necessary for achieving fault-tolerance, by saving counts
- // periodically to HDFS
- val ssc = new StreamingContext(master, "WordCountRaw", Seconds(1))
- ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1))
-
- // Warm up the JVMs on master and slave for JIT compilation to kick in
- warmUp(ssc.sc)
-
- // Set up the raw network streams that will connect to localhost:port to raw test
- // senders on the slaves and generate count of words of last 30 seconds
- val lines = (1 to numStreams).map(_ => {
- ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2)
- })
- val union = ssc.union(lines)
- val counts = union.mapPartitions(splitAndCountPartitions)
- val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10)
- windowedCounts.foreach(r => println("# unique words = " + r.count()))
-
- ssc.start()
- }
-}
diff --git a/streaming/src/test/resources/log4j.properties b/streaming/src/test/resources/log4j.properties
index 33bafebaab..edfa1243fa 100644
--- a/streaming/src/test/resources/log4j.properties
+++ b/streaming/src/test/resources/log4j.properties
@@ -1,8 +1,8 @@
-# Set everything to be logged to the file streaming-tests.log
+# Set everything to be logged to the file streaming/target/unit-tests.log
log4j.rootCategory=INFO, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
-log4j.appender.file.file=streaming-tests.log
+log4j.appender.file.file=streaming/target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index 76b528bec3..e71ba6ddc1 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -318,7 +318,7 @@ class TestServer(port: Int) extends Logging {
}
}
} catch {
- case e: SocketException => println(e)
+ case e: SocketException => logError("TestServer error", e)
} finally {
logInfo("Connection closed")
if (!clientSocket.isClosed) clientSocket.close()