aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-01-07 14:37:21 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-01-07 14:37:21 -0800
commit237bac36e9dca8828192994dad323b8da1619267 (patch)
treef01be9fa6590b2e1604e4791dc720ccad28e2fea /examples
parent1346126485444afc065bf4951c4bedebe5c95ce4 (diff)
downloadspark-237bac36e9dca8828192994dad323b8da1619267.tar.gz
spark-237bac36e9dca8828192994dad323b8da1619267.tar.bz2
spark-237bac36e9dca8828192994dad323b8da1619267.zip
Renamed examples and added documentation.
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/spark/streaming/examples/FileStream.scala46
-rw-r--r--examples/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala75
-rw-r--r--examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala2
-rw-r--r--examples/src/main/scala/spark/streaming/examples/GrepRaw.scala32
-rw-r--r--examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala36
-rw-r--r--examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala (renamed from examples/src/main/scala/spark/streaming/examples/WordCountNetwork.scala)17
-rw-r--r--examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala46
-rw-r--r--examples/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala49
-rw-r--r--examples/src/main/scala/spark/streaming/examples/WordCountHdfs.scala25
-rw-r--r--examples/src/main/scala/spark/streaming/examples/WordCountRaw.scala43
10 files changed, 97 insertions, 274 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/FileStream.scala b/examples/src/main/scala/spark/streaming/examples/FileStream.scala
deleted file mode 100644
index 81938d30d4..0000000000
--- a/examples/src/main/scala/spark/streaming/examples/FileStream.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-package spark.streaming.examples
-
-import spark.streaming.StreamingContext
-import spark.streaming.StreamingContext._
-import spark.streaming.Seconds
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.conf.Configuration
-
-
-object FileStream {
- def main(args: Array[String]) {
- if (args.length < 2) {
- System.err.println("Usage: FileStream <master> <new HDFS compatible directory>")
- System.exit(1)
- }
-
- // Create the context
- val ssc = new StreamingContext(args(0), "FileStream", Seconds(1))
-
- // Create the new directory
- val directory = new Path(args(1))
- val fs = directory.getFileSystem(new Configuration())
- if (fs.exists(directory)) throw new Exception("This directory already exists")
- fs.mkdirs(directory)
- fs.deleteOnExit(directory)
-
- // Create the FileInputDStream on the directory and use the
- // stream to count words in new files created
- val inputStream = ssc.textFileStream(directory.toString)
- val words = inputStream.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
- wordCounts.print()
- ssc.start()
-
- // Creating new files in the directory
- val text = "This is a text file"
- for (i <- 1 to 30) {
- ssc.sc.parallelize((1 to (i * 10)).map(_ => text), 10)
- .saveAsTextFile(new Path(directory, i.toString).toString)
- Thread.sleep(1000)
- }
- Thread.sleep(5000) // Waiting for the file to be processed
- ssc.stop()
- System.exit(0)
- }
-} \ No newline at end of file
diff --git a/examples/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala b/examples/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala
deleted file mode 100644
index b7bc15a1d5..0000000000
--- a/examples/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-package spark.streaming.examples
-
-import spark.streaming._
-import spark.streaming.StreamingContext._
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.conf.Configuration
-
-object FileStreamWithCheckpoint {
-
- def main(args: Array[String]) {
-
- if (args.size != 3) {
- println("FileStreamWithCheckpoint <master> <directory> <checkpoint dir>")
- println("FileStreamWithCheckpoint restart <directory> <checkpoint dir>")
- System.exit(-1)
- }
-
- val directory = new Path(args(1))
- val checkpointDir = args(2)
-
- val ssc: StreamingContext = {
-
- if (args(0) == "restart") {
-
- // Recreated streaming context from specified checkpoint file
- new StreamingContext(checkpointDir)
-
- } else {
-
- // Create directory if it does not exist
- val fs = directory.getFileSystem(new Configuration())
- if (!fs.exists(directory)) fs.mkdirs(directory)
-
- // Create new streaming context
- val ssc_ = new StreamingContext(args(0), "FileStreamWithCheckpoint", Seconds(1))
- ssc_.checkpoint(checkpointDir)
-
- // Setup the streaming computation
- val inputStream = ssc_.textFileStream(directory.toString)
- val words = inputStream.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
- wordCounts.print()
-
- ssc_
- }
- }
-
- // Start the stream computation
- startFileWritingThread(directory.toString)
- ssc.start()
- }
-
- def startFileWritingThread(directory: String) {
-
- val fs = new Path(directory).getFileSystem(new Configuration())
-
- val fileWritingThread = new Thread() {
- override def run() {
- val r = new scala.util.Random()
- val text = "This is a sample text file with a random number "
- while(true) {
- val number = r.nextInt()
- val file = new Path(directory, number.toString)
- val fos = fs.create(file)
- fos.writeChars(text + number)
- fos.close()
- println("Created text file " + file)
- Thread.sleep(1000)
- }
- }
- }
- fileWritingThread.start()
- }
-
-}
diff --git a/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala
index e60ce483a3..461929fba2 100644
--- a/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala
@@ -5,7 +5,7 @@ import spark.storage.StorageLevel
import spark.streaming._
/**
- * Produce a streaming count of events received from Flume.
+ * Produces a count of events received from Flume.
*
* This should be used in conjunction with an AvroSink in Flume. It will start
* an Avro server on at the request host:port address and listen for requests.
diff --git a/examples/src/main/scala/spark/streaming/examples/GrepRaw.scala b/examples/src/main/scala/spark/streaming/examples/GrepRaw.scala
deleted file mode 100644
index 812faa368a..0000000000
--- a/examples/src/main/scala/spark/streaming/examples/GrepRaw.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-package spark.streaming.examples
-
-import spark.util.IntParam
-import spark.storage.StorageLevel
-
-import spark.streaming._
-import spark.streaming.util.RawTextHelper._
-
-object GrepRaw {
- def main(args: Array[String]) {
- if (args.length != 5) {
- System.err.println("Usage: GrepRaw <master> <numStreams> <host> <port> <batchMillis>")
- System.exit(1)
- }
-
- val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args
-
- // Create the context
- val ssc = new StreamingContext(master, "GrepRaw", Milliseconds(batchMillis))
-
- // Warm up the JVMs on master and slave for JIT compilation to kick in
- warmUp(ssc.sc)
-
-
- val rawStreams = (1 to numStreams).map(_ =>
- ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray
- val union = ssc.union(rawStreams)
- union.filter(_.contains("Alice")).count().foreach(r =>
- println("Grep count: " + r.collect().mkString))
- ssc.start()
- }
-}
diff --git a/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala b/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala
new file mode 100644
index 0000000000..8530f5c175
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala
@@ -0,0 +1,36 @@
+package spark.streaming.examples
+
+import spark.streaming.{Seconds, StreamingContext}
+import spark.streaming.StreamingContext._
+
+
+/**
+ * Counts words in new text files created in the given directory
+ * Usage: HdfsWordCount <master> <directory>
+ * <master> is the Spark master URL.
+ * <directory> is the directory that Spark Streaming will use to find and read new text files.
+ *
+ * To run this on your local machine on directory `localdir`, run this example
+ * `$ ./run spark.streaming.examples.HdfsWordCount local[2] localdir`
+ * Then create a text file in `localdir` and the words in the file will get counted.
+ */
+object HdfsWordCount {
+ def main(args: Array[String]) {
+ if (args.length < 2) {
+ System.err.println("Usage: HdfsWordCount <master> <directory>")
+ System.exit(1)
+ }
+
+ // Create the context
+ val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2))
+
+ // Create the FileInputDStream on the directory and use the
+ // stream to count words in new files created
+ val lines = ssc.textFileStream(args(1))
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+ wordCounts.print()
+ ssc.start()
+ }
+}
+
diff --git a/examples/src/main/scala/spark/streaming/examples/WordCountNetwork.scala b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala
index eadda60563..43c01d5db2 100644
--- a/examples/src/main/scala/spark/streaming/examples/WordCountNetwork.scala
+++ b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala
@@ -3,16 +3,27 @@ package spark.streaming.examples
import spark.streaming.{Seconds, StreamingContext}
import spark.streaming.StreamingContext._
-object WordCountNetwork {
+/**
+ * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
+ * Usage: NetworkWordCount <master> <hostname> <port>
+ * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ * `$ nc -lk 9999`
+ * and then run the example
+ * `$ ./run spark.streaming.examples.NetworkWordCount local[2] localhost 9999`
+ */
+object NetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
- System.err.println("Usage: WordCountNetwork <master> <hostname> <port>\n" +
+ System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" +
"In local mode, <master> should be 'local[n]' with n > 1")
System.exit(1)
}
// Create the context and set the batch size
- val ssc = new StreamingContext(args(0), "WordCountNetwork", Seconds(1))
+ val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1))
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
diff --git a/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala b/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala
new file mode 100644
index 0000000000..2eec777c54
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala
@@ -0,0 +1,46 @@
+package spark.streaming.examples
+
+import spark.util.IntParam
+import spark.storage.StorageLevel
+
+import spark.streaming._
+import spark.streaming.util.RawTextHelper
+
+/**
+ * Receives text from multiple rawNetworkStreams and counts how many '\n' delimited
+ * lines have the word 'the' in them. This is useful for benchmarking purposes. This
+ * will only work with spark.streaming.util.RawTextSender running on all worker nodes
+ * and with Spark using Kryo serialization (set Java property "spark.serializer" to
+ * "spark.KryoSerializer").
+ * Usage: RawNetworkGrep <master> <numStreams> <host> <port> <batchMillis>
+ * <master> is the Spark master URL
+ * <numStream> is the number rawNetworkStreams, which should be same as number
+ * of work nodes in the cluster
+ * <host> is "localhost".
+ * <port> is the port on which RawTextSender is running in the worker nodes.
+ * <batchMillise> is the Spark Streaming batch duration in milliseconds.
+ */
+
+object RawNetworkGrep {
+ def main(args: Array[String]) {
+ if (args.length != 5) {
+ System.err.println("Usage: RawNetworkGrep <master> <numStreams> <host> <port> <batchMillis>")
+ System.exit(1)
+ }
+
+ val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args
+
+ // Create the context
+ val ssc = new StreamingContext(master, "RawNetworkGrep", Milliseconds(batchMillis))
+
+ // Warm up the JVMs on master and slave for JIT compilation to kick in
+ RawTextHelper.warmUp(ssc.sc)
+
+ val rawStreams = (1 to numStreams).map(_ =>
+ ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray
+ val union = ssc.union(rawStreams)
+ union.filter(_.contains("the")).count().foreach(r =>
+ println("Grep count: " + r.collect().mkString))
+ ssc.start()
+ }
+}
diff --git a/examples/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/examples/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
deleted file mode 100644
index 338834bc3c..0000000000
--- a/examples/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-package spark.streaming.examples
-
-import spark.storage.StorageLevel
-import spark.util.IntParam
-
-import spark.streaming._
-import spark.streaming.StreamingContext._
-import spark.streaming.util.RawTextHelper._
-
-import java.util.UUID
-
-object TopKWordCountRaw {
-
- def main(args: Array[String]) {
- if (args.length != 4) {
- System.err.println("Usage: WordCountRaw <master> <# streams> <port> <HDFS checkpoint directory> ")
- System.exit(1)
- }
-
- val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args
- val k = 10
-
- // Create the context, and set the checkpoint directory.
- // Checkpoint directory is necessary for achieving fault-tolerance, by saving counts
- // periodically to HDFS
- val ssc = new StreamingContext(master, "TopKWordCountRaw", Seconds(1))
- ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1))
-
- // Warm up the JVMs on master and slave for JIT compilation to kick in
- /*warmUp(ssc.sc)*/
-
- // Set up the raw network streams that will connect to localhost:port to raw test
- // senders on the slaves and generate top K words of last 30 seconds
- val lines = (1 to numStreams).map(_ => {
- ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2)
- })
- val union = ssc.union(lines)
- val counts = union.mapPartitions(splitAndCountPartitions)
- val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10)
- val partialTopKWindowedCounts = windowedCounts.mapPartitions(topK(_, k))
- partialTopKWindowedCounts.foreach(rdd => {
- val collectedCounts = rdd.collect
- println("Collected " + collectedCounts.size + " words from partial top words")
- println("Top " + k + " words are " + topK(collectedCounts.toIterator, k).mkString(","))
- })
-
- ssc.start()
- }
-}
diff --git a/examples/src/main/scala/spark/streaming/examples/WordCountHdfs.scala b/examples/src/main/scala/spark/streaming/examples/WordCountHdfs.scala
deleted file mode 100644
index 867a8f42c4..0000000000
--- a/examples/src/main/scala/spark/streaming/examples/WordCountHdfs.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-package spark.streaming.examples
-
-import spark.streaming.{Seconds, StreamingContext}
-import spark.streaming.StreamingContext._
-
-object WordCountHdfs {
- def main(args: Array[String]) {
- if (args.length < 2) {
- System.err.println("Usage: WordCountHdfs <master> <directory>")
- System.exit(1)
- }
-
- // Create the context
- val ssc = new StreamingContext(args(0), "WordCountHdfs", Seconds(2))
-
- // Create the FileInputDStream on the directory and use the
- // stream to count words in new files created
- val lines = ssc.textFileStream(args(1))
- val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
- wordCounts.print()
- ssc.start()
- }
-}
-
diff --git a/examples/src/main/scala/spark/streaming/examples/WordCountRaw.scala b/examples/src/main/scala/spark/streaming/examples/WordCountRaw.scala
deleted file mode 100644
index d93335a8ce..0000000000
--- a/examples/src/main/scala/spark/streaming/examples/WordCountRaw.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-package spark.streaming.examples
-
-import spark.storage.StorageLevel
-import spark.util.IntParam
-
-import spark.streaming._
-import spark.streaming.StreamingContext._
-import spark.streaming.util.RawTextHelper._
-
-import java.util.UUID
-
-object WordCountRaw {
-
- def main(args: Array[String]) {
- if (args.length != 4) {
- System.err.println("Usage: WordCountRaw <master> <# streams> <port> <HDFS checkpoint directory> ")
- System.exit(1)
- }
-
- val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args
-
- // Create the context, and set the checkpoint directory.
- // Checkpoint directory is necessary for achieving fault-tolerance, by saving counts
- // periodically to HDFS
- val ssc = new StreamingContext(master, "WordCountRaw", Seconds(1))
- ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1))
-
- // Warm up the JVMs on master and slave for JIT compilation to kick in
- warmUp(ssc.sc)
-
- // Set up the raw network streams that will connect to localhost:port to raw test
- // senders on the slaves and generate count of words of last 30 seconds
- val lines = (1 to numStreams).map(_ => {
- ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2)
- })
- val union = ssc.union(lines)
- val counts = union.mapPartitions(splitAndCountPartitions)
- val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10)
- windowedCounts.foreach(r => println("# unique words = " + r.count()))
-
- ssc.start()
- }
-}