From af8738dfb592eb37d4d6c91e42624e844d4e493b Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sun, 6 Jan 2013 19:31:54 -0800 Subject: Moved Spark Streaming examples to examples sub-project. --- .../spark/streaming/examples/FileStream.scala | 46 ++++++++++++ .../examples/FileStreamWithCheckpoint.scala | 75 +++++++++++++++++++ .../spark/streaming/examples/FlumeEventCount.scala | 43 +++++++++++ .../scala/spark/streaming/examples/GrepRaw.scala | 32 ++++++++ .../spark/streaming/examples/KafkaWordCount.scala | 69 ++++++++++++++++++ .../spark/streaming/examples/QueueStream.scala | 39 ++++++++++ .../streaming/examples/TopKWordCountRaw.scala | 49 +++++++++++++ .../spark/streaming/examples/WordCountHdfs.scala | 25 +++++++ .../streaming/examples/WordCountNetwork.scala | 25 +++++++ .../spark/streaming/examples/WordCountRaw.scala | 43 +++++++++++ .../examples/clickstream/PageViewGenerator.scala | 85 ++++++++++++++++++++++ .../examples/clickstream/PageViewStream.scala | 84 +++++++++++++++++++++ 12 files changed, 615 insertions(+) create mode 100644 examples/src/main/scala/spark/streaming/examples/FileStream.scala create mode 100644 examples/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala create mode 100644 examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala create mode 100644 examples/src/main/scala/spark/streaming/examples/GrepRaw.scala create mode 100644 examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala create mode 100644 examples/src/main/scala/spark/streaming/examples/QueueStream.scala create mode 100644 examples/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala create mode 100644 examples/src/main/scala/spark/streaming/examples/WordCountHdfs.scala create mode 100644 examples/src/main/scala/spark/streaming/examples/WordCountNetwork.scala create mode 100644 examples/src/main/scala/spark/streaming/examples/WordCountRaw.scala create mode 100644 examples/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala create mode 100644 examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala (limited to 'examples/src') diff --git a/examples/src/main/scala/spark/streaming/examples/FileStream.scala b/examples/src/main/scala/spark/streaming/examples/FileStream.scala new file mode 100644 index 0000000000..81938d30d4 --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/FileStream.scala @@ -0,0 +1,46 @@ +package spark.streaming.examples + +import spark.streaming.StreamingContext +import spark.streaming.StreamingContext._ +import spark.streaming.Seconds +import org.apache.hadoop.fs.Path +import org.apache.hadoop.conf.Configuration + + +object FileStream { + def main(args: Array[String]) { + if (args.length < 2) { + System.err.println("Usage: FileStream ") + System.exit(1) + } + + // Create the context + val ssc = new StreamingContext(args(0), "FileStream", Seconds(1)) + + // Create the new directory + val directory = new Path(args(1)) + val fs = directory.getFileSystem(new Configuration()) + if (fs.exists(directory)) throw new Exception("This directory already exists") + fs.mkdirs(directory) + fs.deleteOnExit(directory) + + // Create the FileInputDStream on the directory and use the + // stream to count words in new files created + val inputStream = ssc.textFileStream(directory.toString) + val words = inputStream.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + wordCounts.print() + ssc.start() + + // Creating new files in the directory + val text = "This is a text file" + for (i <- 1 to 30) { + ssc.sc.parallelize((1 to (i * 10)).map(_ => text), 10) + .saveAsTextFile(new Path(directory, i.toString).toString) + Thread.sleep(1000) + } + Thread.sleep(5000) // Waiting for the file to be processed + ssc.stop() + System.exit(0) + } +} \ No newline at end of file diff --git a/examples/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala b/examples/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala new file mode 100644 index 0000000000..b7bc15a1d5 --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala @@ -0,0 +1,75 @@ +package spark.streaming.examples + +import spark.streaming._ +import spark.streaming.StreamingContext._ +import org.apache.hadoop.fs.Path +import org.apache.hadoop.conf.Configuration + +object FileStreamWithCheckpoint { + + def main(args: Array[String]) { + + if (args.size != 3) { + println("FileStreamWithCheckpoint ") + println("FileStreamWithCheckpoint restart ") + System.exit(-1) + } + + val directory = new Path(args(1)) + val checkpointDir = args(2) + + val ssc: StreamingContext = { + + if (args(0) == "restart") { + + // Recreated streaming context from specified checkpoint file + new StreamingContext(checkpointDir) + + } else { + + // Create directory if it does not exist + val fs = directory.getFileSystem(new Configuration()) + if (!fs.exists(directory)) fs.mkdirs(directory) + + // Create new streaming context + val ssc_ = new StreamingContext(args(0), "FileStreamWithCheckpoint", Seconds(1)) + ssc_.checkpoint(checkpointDir) + + // Setup the streaming computation + val inputStream = ssc_.textFileStream(directory.toString) + val words = inputStream.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + wordCounts.print() + + ssc_ + } + } + + // Start the stream computation + startFileWritingThread(directory.toString) + ssc.start() + } + + def startFileWritingThread(directory: String) { + + val fs = new Path(directory).getFileSystem(new Configuration()) + + val fileWritingThread = new Thread() { + override def run() { + val r = new scala.util.Random() + val text = "This is a sample text file with a random number " + while(true) { + val number = r.nextInt() + val file = new Path(directory, number.toString) + val fos = fs.create(file) + fos.writeChars(text + number) + fos.close() + println("Created text file " + file) + Thread.sleep(1000) + } + } + } + fileWritingThread.start() + } + +} diff --git a/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala new file mode 100644 index 0000000000..e60ce483a3 --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala @@ -0,0 +1,43 @@ +package spark.streaming.examples + +import spark.util.IntParam +import spark.storage.StorageLevel +import spark.streaming._ + +/** + * Produce a streaming count of events received from Flume. + * + * This should be used in conjunction with an AvroSink in Flume. It will start + * an Avro server on at the request host:port address and listen for requests. + * Your Flume AvroSink should be pointed to this address. + * + * Usage: FlumeEventCount + * + * is a Spark master URL + * is the host the Flume receiver will be started on - a receiver + * creates a server and listens for flume events. + * is the port the Flume receiver will listen on. + */ +object FlumeEventCount { + def main(args: Array[String]) { + if (args.length != 3) { + System.err.println( + "Usage: FlumeEventCount ") + System.exit(1) + } + + val Array(master, host, IntParam(port)) = args + + val batchInterval = Milliseconds(2000) + // Create the context and set the batch size + val ssc = new StreamingContext(master, "FlumeEventCount", batchInterval) + + // Create a flume stream + val stream = ssc.flumeStream(host,port,StorageLevel.MEMORY_ONLY) + + // Print out the count of events received from this server in each batch + stream.count().map(cnt => "Received " + cnt + " flume events." ).print() + + ssc.start() + } +} diff --git a/examples/src/main/scala/spark/streaming/examples/GrepRaw.scala b/examples/src/main/scala/spark/streaming/examples/GrepRaw.scala new file mode 100644 index 0000000000..812faa368a --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/GrepRaw.scala @@ -0,0 +1,32 @@ +package spark.streaming.examples + +import spark.util.IntParam +import spark.storage.StorageLevel + +import spark.streaming._ +import spark.streaming.util.RawTextHelper._ + +object GrepRaw { + def main(args: Array[String]) { + if (args.length != 5) { + System.err.println("Usage: GrepRaw ") + System.exit(1) + } + + val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args + + // Create the context + val ssc = new StreamingContext(master, "GrepRaw", Milliseconds(batchMillis)) + + // Warm up the JVMs on master and slave for JIT compilation to kick in + warmUp(ssc.sc) + + + val rawStreams = (1 to numStreams).map(_ => + ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray + val union = ssc.union(rawStreams) + union.filter(_.contains("Alice")).count().foreach(r => + println("Grep count: " + r.collect().mkString)) + ssc.start() + } +} diff --git a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala new file mode 100644 index 0000000000..fe55db6e2c --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala @@ -0,0 +1,69 @@ +package spark.streaming.examples + +import java.util.Properties +import kafka.message.Message +import kafka.producer.SyncProducerConfig +import kafka.producer._ +import spark.SparkContext +import spark.streaming._ +import spark.streaming.StreamingContext._ +import spark.storage.StorageLevel +import spark.streaming.util.RawTextHelper._ + +object KafkaWordCount { + def main(args: Array[String]) { + + if (args.length < 6) { + System.err.println("Usage: KafkaWordCount ") + System.exit(1) + } + + val Array(master, hostname, port, group, topics, numThreads) = args + + val sc = new SparkContext(master, "KafkaWordCount") + val ssc = new StreamingContext(sc, Seconds(2)) + ssc.checkpoint("checkpoint") + + val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap + val lines = ssc.kafkaStream[String](hostname, port.toInt, group, topicpMap) + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2) + wordCounts.print() + + ssc.start() + } +} + +// Produces some random words between 1 and 100. +object KafkaWordCountProducer { + + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: KafkaWordCountProducer ") + System.exit(1) + } + + val Array(hostname, port, topic, messagesPerSec, wordsPerMessage) = args + + // Zookeper connection properties + val props = new Properties() + props.put("zk.connect", hostname + ":" + port) + props.put("serializer.class", "kafka.serializer.StringEncoder") + + val config = new ProducerConfig(props) + val producer = new Producer[String, String](config) + + // Send some messages + while(true) { + val messages = (1 to messagesPerSec.toInt).map { messageNum => + (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString).mkString(" ") + }.toArray + println(messages.mkString(",")) + val data = new ProducerData[String, String](topic, messages) + producer.send(data) + Thread.sleep(100) + } + } + +} + diff --git a/examples/src/main/scala/spark/streaming/examples/QueueStream.scala b/examples/src/main/scala/spark/streaming/examples/QueueStream.scala new file mode 100644 index 0000000000..2a265d021d --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/QueueStream.scala @@ -0,0 +1,39 @@ +package spark.streaming.examples + +import spark.RDD +import spark.streaming.{Seconds, StreamingContext} +import spark.streaming.StreamingContext._ + +import scala.collection.mutable.SynchronizedQueue + +object QueueStream { + + def main(args: Array[String]) { + if (args.length < 1) { + System.err.println("Usage: QueueStream ") + System.exit(1) + } + + // Create the context + val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1)) + + // Create the queue through which RDDs can be pushed to + // a QueueInputDStream + val rddQueue = new SynchronizedQueue[RDD[Int]]() + + // Create the QueueInputDStream and use it do some processing + val inputStream = ssc.queueStream(rddQueue) + val mappedStream = inputStream.map(x => (x % 10, 1)) + val reducedStream = mappedStream.reduceByKey(_ + _) + reducedStream.print() + ssc.start() + + // Create and push some RDDs into + for (i <- 1 to 30) { + rddQueue += ssc.sc.makeRDD(1 to 1000, 10) + Thread.sleep(1000) + } + ssc.stop() + System.exit(0) + } +} \ No newline at end of file diff --git a/examples/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/examples/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala new file mode 100644 index 0000000000..338834bc3c --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala @@ -0,0 +1,49 @@ +package spark.streaming.examples + +import spark.storage.StorageLevel +import spark.util.IntParam + +import spark.streaming._ +import spark.streaming.StreamingContext._ +import spark.streaming.util.RawTextHelper._ + +import java.util.UUID + +object TopKWordCountRaw { + + def main(args: Array[String]) { + if (args.length != 4) { + System.err.println("Usage: WordCountRaw <# streams> ") + System.exit(1) + } + + val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args + val k = 10 + + // Create the context, and set the checkpoint directory. + // Checkpoint directory is necessary for achieving fault-tolerance, by saving counts + // periodically to HDFS + val ssc = new StreamingContext(master, "TopKWordCountRaw", Seconds(1)) + ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1)) + + // Warm up the JVMs on master and slave for JIT compilation to kick in + /*warmUp(ssc.sc)*/ + + // Set up the raw network streams that will connect to localhost:port to raw test + // senders on the slaves and generate top K words of last 30 seconds + val lines = (1 to numStreams).map(_ => { + ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2) + }) + val union = ssc.union(lines) + val counts = union.mapPartitions(splitAndCountPartitions) + val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10) + val partialTopKWindowedCounts = windowedCounts.mapPartitions(topK(_, k)) + partialTopKWindowedCounts.foreach(rdd => { + val collectedCounts = rdd.collect + println("Collected " + collectedCounts.size + " words from partial top words") + println("Top " + k + " words are " + topK(collectedCounts.toIterator, k).mkString(",")) + }) + + ssc.start() + } +} diff --git a/examples/src/main/scala/spark/streaming/examples/WordCountHdfs.scala b/examples/src/main/scala/spark/streaming/examples/WordCountHdfs.scala new file mode 100644 index 0000000000..867a8f42c4 --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/WordCountHdfs.scala @@ -0,0 +1,25 @@ +package spark.streaming.examples + +import spark.streaming.{Seconds, StreamingContext} +import spark.streaming.StreamingContext._ + +object WordCountHdfs { + def main(args: Array[String]) { + if (args.length < 2) { + System.err.println("Usage: WordCountHdfs ") + System.exit(1) + } + + // Create the context + val ssc = new StreamingContext(args(0), "WordCountHdfs", Seconds(2)) + + // Create the FileInputDStream on the directory and use the + // stream to count words in new files created + val lines = ssc.textFileStream(args(1)) + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + wordCounts.print() + ssc.start() + } +} + diff --git a/examples/src/main/scala/spark/streaming/examples/WordCountNetwork.scala b/examples/src/main/scala/spark/streaming/examples/WordCountNetwork.scala new file mode 100644 index 0000000000..eadda60563 --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/WordCountNetwork.scala @@ -0,0 +1,25 @@ +package spark.streaming.examples + +import spark.streaming.{Seconds, StreamingContext} +import spark.streaming.StreamingContext._ + +object WordCountNetwork { + def main(args: Array[String]) { + if (args.length < 2) { + System.err.println("Usage: WordCountNetwork \n" + + "In local mode, should be 'local[n]' with n > 1") + System.exit(1) + } + + // Create the context and set the batch size + val ssc = new StreamingContext(args(0), "WordCountNetwork", Seconds(1)) + + // Create a NetworkInputDStream on target ip:port and count the + // words in input stream of \n delimited test (eg. generated by 'nc') + val lines = ssc.networkTextStream(args(1), args(2).toInt) + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + wordCounts.print() + ssc.start() + } +} diff --git a/examples/src/main/scala/spark/streaming/examples/WordCountRaw.scala b/examples/src/main/scala/spark/streaming/examples/WordCountRaw.scala new file mode 100644 index 0000000000..d93335a8ce --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/WordCountRaw.scala @@ -0,0 +1,43 @@ +package spark.streaming.examples + +import spark.storage.StorageLevel +import spark.util.IntParam + +import spark.streaming._ +import spark.streaming.StreamingContext._ +import spark.streaming.util.RawTextHelper._ + +import java.util.UUID + +object WordCountRaw { + + def main(args: Array[String]) { + if (args.length != 4) { + System.err.println("Usage: WordCountRaw <# streams> ") + System.exit(1) + } + + val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args + + // Create the context, and set the checkpoint directory. + // Checkpoint directory is necessary for achieving fault-tolerance, by saving counts + // periodically to HDFS + val ssc = new StreamingContext(master, "WordCountRaw", Seconds(1)) + ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1)) + + // Warm up the JVMs on master and slave for JIT compilation to kick in + warmUp(ssc.sc) + + // Set up the raw network streams that will connect to localhost:port to raw test + // senders on the slaves and generate count of words of last 30 seconds + val lines = (1 to numStreams).map(_ => { + ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2) + }) + val union = ssc.union(lines) + val counts = union.mapPartitions(splitAndCountPartitions) + val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10) + windowedCounts.foreach(r => println("# unique words = " + r.count())) + + ssc.start() + } +} diff --git a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala new file mode 100644 index 0000000000..4c6e08bc74 --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala @@ -0,0 +1,85 @@ +package spark.streaming.examples.clickstream + +import java.net.{InetAddress,ServerSocket,Socket,SocketException} +import java.io.{InputStreamReader, BufferedReader, PrintWriter} +import util.Random + +/** Represents a page view on a website with associated dimension data.*/ +class PageView(val url : String, val status : Int, val zipCode : Int, val userID : Int) { + override def toString() : String = { + "%s\t%s\t%s\t%s\n".format(url, status, zipCode, userID) + } +} +object PageView { + def fromString(in : String) : PageView = { + val parts = in.split("\t") + new PageView(parts(0), parts(1).toInt, parts(2).toInt, parts(3).toInt) + } +} + +/** Generates streaming events to simulate page views on a website. + * + * This should be used in tandem with PageViewStream.scala. Example: + * $ ./run spark.streaming.examples.clickstream.PageViewGenerator 44444 10 + * $ ./run spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444 + * */ +object PageViewGenerator { + val pages = Map("http://foo.com/" -> .7, + "http://foo.com/news" -> 0.2, + "http://foo.com/contact" -> .1) + val httpStatus = Map(200 -> .95, + 404 -> .05) + val userZipCode = Map(94709 -> .5, + 94117 -> .5) + val userID = Map((1 to 100).map(_ -> .01):_*) + + + def pickFromDistribution[T](inputMap : Map[T, Double]) : T = { + val rand = new Random().nextDouble() + var total = 0.0 + for ((item, prob) <- inputMap) { + total = total + prob + if (total > rand) { + return item + } + } + return inputMap.take(1).head._1 // Shouldn't get here if probabilities add up to 1.0 + } + + def getNextClickEvent() : String = { + val id = pickFromDistribution(userID) + val page = pickFromDistribution(pages) + val status = pickFromDistribution(httpStatus) + val zipCode = pickFromDistribution(userZipCode) + new PageView(page, status, zipCode, id).toString() + } + + def main(args : Array[String]) { + if (args.length != 2) { + System.err.println("Usage: PageViewGenerator ") + System.exit(1) + } + val port = args(0).toInt + val viewsPerSecond = args(1).toFloat + val sleepDelayMs = (1000.0 / viewsPerSecond).toInt + val listener = new ServerSocket(port) + println("Listening on port: " + port) + + while (true) { + val socket = listener.accept() + new Thread() { + override def run = { + println("Got client connected from: " + socket.getInetAddress) + val out = new PrintWriter(socket.getOutputStream(), true) + + while (true) { + Thread.sleep(sleepDelayMs) + out.write(getNextClickEvent()) + out.flush() + } + socket.close() + } + }.start() + } + } +} diff --git a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala new file mode 100644 index 0000000000..a191321d91 --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala @@ -0,0 +1,84 @@ +package spark.streaming.examples.clickstream + +import spark.streaming.{Seconds, StreamingContext} +import spark.streaming.StreamingContext._ +import spark.SparkContext._ + +/** Analyses a streaming dataset of web page views. This class demonstrates several types of + * operators available in Spark streaming. + * + * This should be used in tandem with PageViewStream.scala. Example: + * $ ./run spark.streaming.examples.clickstream.PageViewGenerator 44444 10 + * $ ./run spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444 + * */ +object PageViewStream { + def main(args: Array[String]) { + if (args.length != 3) { + System.err.println("Usage: PageViewStream ") + System.err.println(" must be one of pageCounts, slidingPageCounts," + + " errorRatePerZipCode, activeUserCount, popularUsersSeen") + System.exit(1) + } + val metric = args(0) + val host = args(1) + val port = args(2).toInt + + // Create the context + val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1)) + + // Create a NetworkInputDStream on target host:port and convert each line to a PageView + val pageViews = ssc.networkTextStream(host, port) + .flatMap(_.split("\n")) + .map(PageView.fromString(_)) + + // Return a count of views per URL seen in each batch + val pageCounts = pageViews.map(view => ((view.url, 1))).countByKey() + + // Return a sliding window of page views per URL in the last ten seconds + val slidingPageCounts = pageViews.map(view => ((view.url, 1))) + .window(Seconds(10), Seconds(2)) + .countByKey() + + + // Return the rate of error pages (a non 200 status) in each zip code over the last 30 seconds + val statusesPerZipCode = pageViews.window(Seconds(30), Seconds(2)) + .map(view => ((view.zipCode, view.status))) + .groupByKey() + val errorRatePerZipCode = statusesPerZipCode.map{ + case(zip, statuses) => + val normalCount = statuses.filter(_ == 200).size + val errorCount = statuses.size - normalCount + val errorRatio = errorCount.toFloat / statuses.size + if (errorRatio > 0.05) {"%s: **%s**".format(zip, errorRatio)} + else {"%s: %s".format(zip, errorRatio)} + } + + // Return the number unique users in last 15 seconds + val activeUserCount = pageViews.window(Seconds(15), Seconds(2)) + .map(view => (view.userID, 1)) + .groupByKey() + .count() + .map("Unique active users: " + _) + + // An external dataset we want to join to this stream + val userList = ssc.sc.parallelize( + Map(1 -> "Patrick Wendell", 2->"Reynold Xin", 3->"Matei Zaharia").toSeq) + + metric match { + case "pageCounts" => pageCounts.print() + case "slidingPageCounts" => slidingPageCounts.print() + case "errorRatePerZipCode" => errorRatePerZipCode.print() + case "activeUserCount" => activeUserCount.print() + case "popularUsersSeen" => + // Look for users in our existing dataset and print it out if we have a match + pageViews.map(view => (view.userID, 1)) + .foreach((rdd, time) => rdd.join(userList) + .map(_._2._2) + .take(10) + .foreach(u => println("Saw user %s at time %s".format(u, time)))) + case _ => println("Invalid metric entered: " + metric) + } + + ssc.start() + } +} -- cgit v1.2.3