aboutsummaryrefslogtreecommitdiff
path: root/examples/src
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-01-06 19:31:54 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-01-06 19:31:54 -0800
commitaf8738dfb592eb37d4d6c91e42624e844d4e493b (patch)
treee9996686a339fec90eef4e98009cbbc0a8a23606 /examples/src
parent934ecc829aa06ce4d9ded3596b86b4733ed2a123 (diff)
downloadspark-af8738dfb592eb37d4d6c91e42624e844d4e493b.tar.gz
spark-af8738dfb592eb37d4d6c91e42624e844d4e493b.tar.bz2
spark-af8738dfb592eb37d4d6c91e42624e844d4e493b.zip
Moved Spark Streaming examples to examples sub-project.
Diffstat (limited to 'examples/src')
-rw-r--r--examples/src/main/scala/spark/streaming/examples/FileStream.scala46
-rw-r--r--examples/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala75
-rw-r--r--examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala43
-rw-r--r--examples/src/main/scala/spark/streaming/examples/GrepRaw.scala32
-rw-r--r--examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala69
-rw-r--r--examples/src/main/scala/spark/streaming/examples/QueueStream.scala39
-rw-r--r--examples/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala49
-rw-r--r--examples/src/main/scala/spark/streaming/examples/WordCountHdfs.scala25
-rw-r--r--examples/src/main/scala/spark/streaming/examples/WordCountNetwork.scala25
-rw-r--r--examples/src/main/scala/spark/streaming/examples/WordCountRaw.scala43
-rw-r--r--examples/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala85
-rw-r--r--examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala84
12 files changed, 615 insertions, 0 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/FileStream.scala b/examples/src/main/scala/spark/streaming/examples/FileStream.scala
new file mode 100644
index 0000000000..81938d30d4
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/FileStream.scala
@@ -0,0 +1,46 @@
+package spark.streaming.examples
+
+import spark.streaming.StreamingContext
+import spark.streaming.StreamingContext._
+import spark.streaming.Seconds
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.conf.Configuration
+
+
+object FileStream {
+ def main(args: Array[String]) {
+ if (args.length < 2) {
+ System.err.println("Usage: FileStream <master> <new HDFS compatible directory>")
+ System.exit(1)
+ }
+
+ // Create the context
+ val ssc = new StreamingContext(args(0), "FileStream", Seconds(1))
+
+ // Create the new directory
+ val directory = new Path(args(1))
+ val fs = directory.getFileSystem(new Configuration())
+ if (fs.exists(directory)) throw new Exception("This directory already exists")
+ fs.mkdirs(directory)
+ fs.deleteOnExit(directory)
+
+ // Create the FileInputDStream on the directory and use the
+ // stream to count words in new files created
+ val inputStream = ssc.textFileStream(directory.toString)
+ val words = inputStream.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+ wordCounts.print()
+ ssc.start()
+
+ // Creating new files in the directory
+ val text = "This is a text file"
+ for (i <- 1 to 30) {
+ ssc.sc.parallelize((1 to (i * 10)).map(_ => text), 10)
+ .saveAsTextFile(new Path(directory, i.toString).toString)
+ Thread.sleep(1000)
+ }
+ Thread.sleep(5000) // Waiting for the file to be processed
+ ssc.stop()
+ System.exit(0)
+ }
+} \ No newline at end of file
diff --git a/examples/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala b/examples/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala
new file mode 100644
index 0000000000..b7bc15a1d5
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala
@@ -0,0 +1,75 @@
+package spark.streaming.examples
+
+import spark.streaming._
+import spark.streaming.StreamingContext._
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.conf.Configuration
+
+object FileStreamWithCheckpoint {
+
+ def main(args: Array[String]) {
+
+ if (args.size != 3) {
+ println("FileStreamWithCheckpoint <master> <directory> <checkpoint dir>")
+ println("FileStreamWithCheckpoint restart <directory> <checkpoint dir>")
+ System.exit(-1)
+ }
+
+ val directory = new Path(args(1))
+ val checkpointDir = args(2)
+
+ val ssc: StreamingContext = {
+
+ if (args(0) == "restart") {
+
+ // Recreated streaming context from specified checkpoint file
+ new StreamingContext(checkpointDir)
+
+ } else {
+
+ // Create directory if it does not exist
+ val fs = directory.getFileSystem(new Configuration())
+ if (!fs.exists(directory)) fs.mkdirs(directory)
+
+ // Create new streaming context
+ val ssc_ = new StreamingContext(args(0), "FileStreamWithCheckpoint", Seconds(1))
+ ssc_.checkpoint(checkpointDir)
+
+ // Setup the streaming computation
+ val inputStream = ssc_.textFileStream(directory.toString)
+ val words = inputStream.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+ wordCounts.print()
+
+ ssc_
+ }
+ }
+
+ // Start the stream computation
+ startFileWritingThread(directory.toString)
+ ssc.start()
+ }
+
+ def startFileWritingThread(directory: String) {
+
+ val fs = new Path(directory).getFileSystem(new Configuration())
+
+ val fileWritingThread = new Thread() {
+ override def run() {
+ val r = new scala.util.Random()
+ val text = "This is a sample text file with a random number "
+ while(true) {
+ val number = r.nextInt()
+ val file = new Path(directory, number.toString)
+ val fos = fs.create(file)
+ fos.writeChars(text + number)
+ fos.close()
+ println("Created text file " + file)
+ Thread.sleep(1000)
+ }
+ }
+ }
+ fileWritingThread.start()
+ }
+
+}
diff --git a/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala
new file mode 100644
index 0000000000..e60ce483a3
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala
@@ -0,0 +1,43 @@
+package spark.streaming.examples
+
+import spark.util.IntParam
+import spark.storage.StorageLevel
+import spark.streaming._
+
+/**
+ * Produce a streaming count of events received from Flume.
+ *
+ * This should be used in conjunction with an AvroSink in Flume. It will start
+ * an Avro server on at the request host:port address and listen for requests.
+ * Your Flume AvroSink should be pointed to this address.
+ *
+ * Usage: FlumeEventCount <master> <host> <port>
+ *
+ * <master> is a Spark master URL
+ * <host> is the host the Flume receiver will be started on - a receiver
+ * creates a server and listens for flume events.
+ * <port> is the port the Flume receiver will listen on.
+ */
+object FlumeEventCount {
+ def main(args: Array[String]) {
+ if (args.length != 3) {
+ System.err.println(
+ "Usage: FlumeEventCount <master> <host> <port>")
+ System.exit(1)
+ }
+
+ val Array(master, host, IntParam(port)) = args
+
+ val batchInterval = Milliseconds(2000)
+ // Create the context and set the batch size
+ val ssc = new StreamingContext(master, "FlumeEventCount", batchInterval)
+
+ // Create a flume stream
+ val stream = ssc.flumeStream(host,port,StorageLevel.MEMORY_ONLY)
+
+ // Print out the count of events received from this server in each batch
+ stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
+
+ ssc.start()
+ }
+}
diff --git a/examples/src/main/scala/spark/streaming/examples/GrepRaw.scala b/examples/src/main/scala/spark/streaming/examples/GrepRaw.scala
new file mode 100644
index 0000000000..812faa368a
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/GrepRaw.scala
@@ -0,0 +1,32 @@
+package spark.streaming.examples
+
+import spark.util.IntParam
+import spark.storage.StorageLevel
+
+import spark.streaming._
+import spark.streaming.util.RawTextHelper._
+
+object GrepRaw {
+ def main(args: Array[String]) {
+ if (args.length != 5) {
+ System.err.println("Usage: GrepRaw <master> <numStreams> <host> <port> <batchMillis>")
+ System.exit(1)
+ }
+
+ val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args
+
+ // Create the context
+ val ssc = new StreamingContext(master, "GrepRaw", Milliseconds(batchMillis))
+
+ // Warm up the JVMs on master and slave for JIT compilation to kick in
+ warmUp(ssc.sc)
+
+
+ val rawStreams = (1 to numStreams).map(_ =>
+ ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray
+ val union = ssc.union(rawStreams)
+ union.filter(_.contains("Alice")).count().foreach(r =>
+ println("Grep count: " + r.collect().mkString))
+ ssc.start()
+ }
+}
diff --git a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
new file mode 100644
index 0000000000..fe55db6e2c
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
@@ -0,0 +1,69 @@
+package spark.streaming.examples
+
+import java.util.Properties
+import kafka.message.Message
+import kafka.producer.SyncProducerConfig
+import kafka.producer._
+import spark.SparkContext
+import spark.streaming._
+import spark.streaming.StreamingContext._
+import spark.storage.StorageLevel
+import spark.streaming.util.RawTextHelper._
+
+object KafkaWordCount {
+ def main(args: Array[String]) {
+
+ if (args.length < 6) {
+ System.err.println("Usage: KafkaWordCount <master> <hostname> <port> <group> <topics> <numThreads>")
+ System.exit(1)
+ }
+
+ val Array(master, hostname, port, group, topics, numThreads) = args
+
+ val sc = new SparkContext(master, "KafkaWordCount")
+ val ssc = new StreamingContext(sc, Seconds(2))
+ ssc.checkpoint("checkpoint")
+
+ val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
+ val lines = ssc.kafkaStream[String](hostname, port.toInt, group, topicpMap)
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2)
+ wordCounts.print()
+
+ ssc.start()
+ }
+}
+
+// Produces some random words between 1 and 100.
+object KafkaWordCountProducer {
+
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println("Usage: KafkaWordCountProducer <hostname> <port> <topic> <messagesPerSec> <wordsPerMessage>")
+ System.exit(1)
+ }
+
+ val Array(hostname, port, topic, messagesPerSec, wordsPerMessage) = args
+
+ // Zookeper connection properties
+ val props = new Properties()
+ props.put("zk.connect", hostname + ":" + port)
+ props.put("serializer.class", "kafka.serializer.StringEncoder")
+
+ val config = new ProducerConfig(props)
+ val producer = new Producer[String, String](config)
+
+ // Send some messages
+ while(true) {
+ val messages = (1 to messagesPerSec.toInt).map { messageNum =>
+ (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString).mkString(" ")
+ }.toArray
+ println(messages.mkString(","))
+ val data = new ProducerData[String, String](topic, messages)
+ producer.send(data)
+ Thread.sleep(100)
+ }
+ }
+
+}
+
diff --git a/examples/src/main/scala/spark/streaming/examples/QueueStream.scala b/examples/src/main/scala/spark/streaming/examples/QueueStream.scala
new file mode 100644
index 0000000000..2a265d021d
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/QueueStream.scala
@@ -0,0 +1,39 @@
+package spark.streaming.examples
+
+import spark.RDD
+import spark.streaming.{Seconds, StreamingContext}
+import spark.streaming.StreamingContext._
+
+import scala.collection.mutable.SynchronizedQueue
+
+object QueueStream {
+
+ def main(args: Array[String]) {
+ if (args.length < 1) {
+ System.err.println("Usage: QueueStream <master>")
+ System.exit(1)
+ }
+
+ // Create the context
+ val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1))
+
+ // Create the queue through which RDDs can be pushed to
+ // a QueueInputDStream
+ val rddQueue = new SynchronizedQueue[RDD[Int]]()
+
+ // Create the QueueInputDStream and use it do some processing
+ val inputStream = ssc.queueStream(rddQueue)
+ val mappedStream = inputStream.map(x => (x % 10, 1))
+ val reducedStream = mappedStream.reduceByKey(_ + _)
+ reducedStream.print()
+ ssc.start()
+
+ // Create and push some RDDs into
+ for (i <- 1 to 30) {
+ rddQueue += ssc.sc.makeRDD(1 to 1000, 10)
+ Thread.sleep(1000)
+ }
+ ssc.stop()
+ System.exit(0)
+ }
+} \ No newline at end of file
diff --git a/examples/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/examples/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
new file mode 100644
index 0000000000..338834bc3c
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
@@ -0,0 +1,49 @@
+package spark.streaming.examples
+
+import spark.storage.StorageLevel
+import spark.util.IntParam
+
+import spark.streaming._
+import spark.streaming.StreamingContext._
+import spark.streaming.util.RawTextHelper._
+
+import java.util.UUID
+
+object TopKWordCountRaw {
+
+ def main(args: Array[String]) {
+ if (args.length != 4) {
+ System.err.println("Usage: WordCountRaw <master> <# streams> <port> <HDFS checkpoint directory> ")
+ System.exit(1)
+ }
+
+ val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args
+ val k = 10
+
+ // Create the context, and set the checkpoint directory.
+ // Checkpoint directory is necessary for achieving fault-tolerance, by saving counts
+ // periodically to HDFS
+ val ssc = new StreamingContext(master, "TopKWordCountRaw", Seconds(1))
+ ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1))
+
+ // Warm up the JVMs on master and slave for JIT compilation to kick in
+ /*warmUp(ssc.sc)*/
+
+ // Set up the raw network streams that will connect to localhost:port to raw test
+ // senders on the slaves and generate top K words of last 30 seconds
+ val lines = (1 to numStreams).map(_ => {
+ ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2)
+ })
+ val union = ssc.union(lines)
+ val counts = union.mapPartitions(splitAndCountPartitions)
+ val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10)
+ val partialTopKWindowedCounts = windowedCounts.mapPartitions(topK(_, k))
+ partialTopKWindowedCounts.foreach(rdd => {
+ val collectedCounts = rdd.collect
+ println("Collected " + collectedCounts.size + " words from partial top words")
+ println("Top " + k + " words are " + topK(collectedCounts.toIterator, k).mkString(","))
+ })
+
+ ssc.start()
+ }
+}
diff --git a/examples/src/main/scala/spark/streaming/examples/WordCountHdfs.scala b/examples/src/main/scala/spark/streaming/examples/WordCountHdfs.scala
new file mode 100644
index 0000000000..867a8f42c4
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/WordCountHdfs.scala
@@ -0,0 +1,25 @@
+package spark.streaming.examples
+
+import spark.streaming.{Seconds, StreamingContext}
+import spark.streaming.StreamingContext._
+
+object WordCountHdfs {
+ def main(args: Array[String]) {
+ if (args.length < 2) {
+ System.err.println("Usage: WordCountHdfs <master> <directory>")
+ System.exit(1)
+ }
+
+ // Create the context
+ val ssc = new StreamingContext(args(0), "WordCountHdfs", Seconds(2))
+
+ // Create the FileInputDStream on the directory and use the
+ // stream to count words in new files created
+ val lines = ssc.textFileStream(args(1))
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+ wordCounts.print()
+ ssc.start()
+ }
+}
+
diff --git a/examples/src/main/scala/spark/streaming/examples/WordCountNetwork.scala b/examples/src/main/scala/spark/streaming/examples/WordCountNetwork.scala
new file mode 100644
index 0000000000..eadda60563
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/WordCountNetwork.scala
@@ -0,0 +1,25 @@
+package spark.streaming.examples
+
+import spark.streaming.{Seconds, StreamingContext}
+import spark.streaming.StreamingContext._
+
+object WordCountNetwork {
+ def main(args: Array[String]) {
+ if (args.length < 2) {
+ System.err.println("Usage: WordCountNetwork <master> <hostname> <port>\n" +
+ "In local mode, <master> should be 'local[n]' with n > 1")
+ System.exit(1)
+ }
+
+ // Create the context and set the batch size
+ val ssc = new StreamingContext(args(0), "WordCountNetwork", Seconds(1))
+
+ // Create a NetworkInputDStream on target ip:port and count the
+ // words in input stream of \n delimited test (eg. generated by 'nc')
+ val lines = ssc.networkTextStream(args(1), args(2).toInt)
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+ wordCounts.print()
+ ssc.start()
+ }
+}
diff --git a/examples/src/main/scala/spark/streaming/examples/WordCountRaw.scala b/examples/src/main/scala/spark/streaming/examples/WordCountRaw.scala
new file mode 100644
index 0000000000..d93335a8ce
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/WordCountRaw.scala
@@ -0,0 +1,43 @@
+package spark.streaming.examples
+
+import spark.storage.StorageLevel
+import spark.util.IntParam
+
+import spark.streaming._
+import spark.streaming.StreamingContext._
+import spark.streaming.util.RawTextHelper._
+
+import java.util.UUID
+
+object WordCountRaw {
+
+ def main(args: Array[String]) {
+ if (args.length != 4) {
+ System.err.println("Usage: WordCountRaw <master> <# streams> <port> <HDFS checkpoint directory> ")
+ System.exit(1)
+ }
+
+ val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args
+
+ // Create the context, and set the checkpoint directory.
+ // Checkpoint directory is necessary for achieving fault-tolerance, by saving counts
+ // periodically to HDFS
+ val ssc = new StreamingContext(master, "WordCountRaw", Seconds(1))
+ ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1))
+
+ // Warm up the JVMs on master and slave for JIT compilation to kick in
+ warmUp(ssc.sc)
+
+ // Set up the raw network streams that will connect to localhost:port to raw test
+ // senders on the slaves and generate count of words of last 30 seconds
+ val lines = (1 to numStreams).map(_ => {
+ ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2)
+ })
+ val union = ssc.union(lines)
+ val counts = union.mapPartitions(splitAndCountPartitions)
+ val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10)
+ windowedCounts.foreach(r => println("# unique words = " + r.count()))
+
+ ssc.start()
+ }
+}
diff --git a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala
new file mode 100644
index 0000000000..4c6e08bc74
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala
@@ -0,0 +1,85 @@
+package spark.streaming.examples.clickstream
+
+import java.net.{InetAddress,ServerSocket,Socket,SocketException}
+import java.io.{InputStreamReader, BufferedReader, PrintWriter}
+import util.Random
+
+/** Represents a page view on a website with associated dimension data.*/
+class PageView(val url : String, val status : Int, val zipCode : Int, val userID : Int) {
+ override def toString() : String = {
+ "%s\t%s\t%s\t%s\n".format(url, status, zipCode, userID)
+ }
+}
+object PageView {
+ def fromString(in : String) : PageView = {
+ val parts = in.split("\t")
+ new PageView(parts(0), parts(1).toInt, parts(2).toInt, parts(3).toInt)
+ }
+}
+
+/** Generates streaming events to simulate page views on a website.
+ *
+ * This should be used in tandem with PageViewStream.scala. Example:
+ * $ ./run spark.streaming.examples.clickstream.PageViewGenerator 44444 10
+ * $ ./run spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444
+ * */
+object PageViewGenerator {
+ val pages = Map("http://foo.com/" -> .7,
+ "http://foo.com/news" -> 0.2,
+ "http://foo.com/contact" -> .1)
+ val httpStatus = Map(200 -> .95,
+ 404 -> .05)
+ val userZipCode = Map(94709 -> .5,
+ 94117 -> .5)
+ val userID = Map((1 to 100).map(_ -> .01):_*)
+
+
+ def pickFromDistribution[T](inputMap : Map[T, Double]) : T = {
+ val rand = new Random().nextDouble()
+ var total = 0.0
+ for ((item, prob) <- inputMap) {
+ total = total + prob
+ if (total > rand) {
+ return item
+ }
+ }
+ return inputMap.take(1).head._1 // Shouldn't get here if probabilities add up to 1.0
+ }
+
+ def getNextClickEvent() : String = {
+ val id = pickFromDistribution(userID)
+ val page = pickFromDistribution(pages)
+ val status = pickFromDistribution(httpStatus)
+ val zipCode = pickFromDistribution(userZipCode)
+ new PageView(page, status, zipCode, id).toString()
+ }
+
+ def main(args : Array[String]) {
+ if (args.length != 2) {
+ System.err.println("Usage: PageViewGenerator <port> <viewsPerSecond>")
+ System.exit(1)
+ }
+ val port = args(0).toInt
+ val viewsPerSecond = args(1).toFloat
+ val sleepDelayMs = (1000.0 / viewsPerSecond).toInt
+ val listener = new ServerSocket(port)
+ println("Listening on port: " + port)
+
+ while (true) {
+ val socket = listener.accept()
+ new Thread() {
+ override def run = {
+ println("Got client connected from: " + socket.getInetAddress)
+ val out = new PrintWriter(socket.getOutputStream(), true)
+
+ while (true) {
+ Thread.sleep(sleepDelayMs)
+ out.write(getNextClickEvent())
+ out.flush()
+ }
+ socket.close()
+ }
+ }.start()
+ }
+ }
+}
diff --git a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
new file mode 100644
index 0000000000..a191321d91
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
@@ -0,0 +1,84 @@
+package spark.streaming.examples.clickstream
+
+import spark.streaming.{Seconds, StreamingContext}
+import spark.streaming.StreamingContext._
+import spark.SparkContext._
+
+/** Analyses a streaming dataset of web page views. This class demonstrates several types of
+ * operators available in Spark streaming.
+ *
+ * This should be used in tandem with PageViewStream.scala. Example:
+ * $ ./run spark.streaming.examples.clickstream.PageViewGenerator 44444 10
+ * $ ./run spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444
+ * */
+object PageViewStream {
+ def main(args: Array[String]) {
+ if (args.length != 3) {
+ System.err.println("Usage: PageViewStream <metric> <host> <port>")
+ System.err.println("<metric> must be one of pageCounts, slidingPageCounts," +
+ " errorRatePerZipCode, activeUserCount, popularUsersSeen")
+ System.exit(1)
+ }
+ val metric = args(0)
+ val host = args(1)
+ val port = args(2).toInt
+
+ // Create the context
+ val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1))
+
+ // Create a NetworkInputDStream on target host:port and convert each line to a PageView
+ val pageViews = ssc.networkTextStream(host, port)
+ .flatMap(_.split("\n"))
+ .map(PageView.fromString(_))
+
+ // Return a count of views per URL seen in each batch
+ val pageCounts = pageViews.map(view => ((view.url, 1))).countByKey()
+
+ // Return a sliding window of page views per URL in the last ten seconds
+ val slidingPageCounts = pageViews.map(view => ((view.url, 1)))
+ .window(Seconds(10), Seconds(2))
+ .countByKey()
+
+
+ // Return the rate of error pages (a non 200 status) in each zip code over the last 30 seconds
+ val statusesPerZipCode = pageViews.window(Seconds(30), Seconds(2))
+ .map(view => ((view.zipCode, view.status)))
+ .groupByKey()
+ val errorRatePerZipCode = statusesPerZipCode.map{
+ case(zip, statuses) =>
+ val normalCount = statuses.filter(_ == 200).size
+ val errorCount = statuses.size - normalCount
+ val errorRatio = errorCount.toFloat / statuses.size
+ if (errorRatio > 0.05) {"%s: **%s**".format(zip, errorRatio)}
+ else {"%s: %s".format(zip, errorRatio)}
+ }
+
+ // Return the number unique users in last 15 seconds
+ val activeUserCount = pageViews.window(Seconds(15), Seconds(2))
+ .map(view => (view.userID, 1))
+ .groupByKey()
+ .count()
+ .map("Unique active users: " + _)
+
+ // An external dataset we want to join to this stream
+ val userList = ssc.sc.parallelize(
+ Map(1 -> "Patrick Wendell", 2->"Reynold Xin", 3->"Matei Zaharia").toSeq)
+
+ metric match {
+ case "pageCounts" => pageCounts.print()
+ case "slidingPageCounts" => slidingPageCounts.print()
+ case "errorRatePerZipCode" => errorRatePerZipCode.print()
+ case "activeUserCount" => activeUserCount.print()
+ case "popularUsersSeen" =>
+ // Look for users in our existing dataset and print it out if we have a match
+ pageViews.map(view => (view.userID, 1))
+ .foreach((rdd, time) => rdd.join(userList)
+ .map(_._2._2)
+ .take(10)
+ .foreach(u => println("Saw user %s at time %s".format(u, time))))
+ case _ => println("Invalid metric entered: " + metric)
+ }
+
+ ssc.start()
+ }
+}