From af8738dfb592eb37d4d6c91e42624e844d4e493b Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sun, 6 Jan 2013 19:31:54 -0800 Subject: Moved Spark Streaming examples to examples sub-project. --- .../spark/streaming/examples/FileStream.scala | 46 ------------ .../examples/FileStreamWithCheckpoint.scala | 75 ------------------- .../spark/streaming/examples/FlumeEventCount.scala | 43 ----------- .../scala/spark/streaming/examples/GrepRaw.scala | 33 --------- .../spark/streaming/examples/KafkaWordCount.scala | 69 ------------------ .../spark/streaming/examples/QueueStream.scala | 39 ---------- .../streaming/examples/TopKWordCountRaw.scala | 49 ------------- .../spark/streaming/examples/WordCountHdfs.scala | 25 ------- .../streaming/examples/WordCountNetwork.scala | 25 ------- .../spark/streaming/examples/WordCountRaw.scala | 43 ----------- .../examples/clickstream/PageViewGenerator.scala | 85 ---------------------- .../examples/clickstream/PageViewStream.scala | 84 --------------------- 12 files changed, 616 deletions(-) delete mode 100644 streaming/src/main/scala/spark/streaming/examples/FileStream.scala delete mode 100644 streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala delete mode 100644 streaming/src/main/scala/spark/streaming/examples/FlumeEventCount.scala delete mode 100644 streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala delete mode 100644 streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala delete mode 100644 streaming/src/main/scala/spark/streaming/examples/QueueStream.scala delete mode 100644 streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala delete mode 100644 streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala delete mode 100644 streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala delete mode 100644 streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala delete mode 100644 streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala delete mode 100644 streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala (limited to 'streaming') diff --git a/streaming/src/main/scala/spark/streaming/examples/FileStream.scala b/streaming/src/main/scala/spark/streaming/examples/FileStream.scala deleted file mode 100644 index 81938d30d4..0000000000 --- a/streaming/src/main/scala/spark/streaming/examples/FileStream.scala +++ /dev/null @@ -1,46 +0,0 @@ -package spark.streaming.examples - -import spark.streaming.StreamingContext -import spark.streaming.StreamingContext._ -import spark.streaming.Seconds -import org.apache.hadoop.fs.Path -import org.apache.hadoop.conf.Configuration - - -object FileStream { - def main(args: Array[String]) { - if (args.length < 2) { - System.err.println("Usage: FileStream ") - System.exit(1) - } - - // Create the context - val ssc = new StreamingContext(args(0), "FileStream", Seconds(1)) - - // Create the new directory - val directory = new Path(args(1)) - val fs = directory.getFileSystem(new Configuration()) - if (fs.exists(directory)) throw new Exception("This directory already exists") - fs.mkdirs(directory) - fs.deleteOnExit(directory) - - // Create the FileInputDStream on the directory and use the - // stream to count words in new files created - val inputStream = ssc.textFileStream(directory.toString) - val words = inputStream.flatMap(_.split(" ")) - val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) - wordCounts.print() - ssc.start() - - // Creating new files in the directory - val text = "This is a text file" - for (i <- 1 to 30) { - ssc.sc.parallelize((1 to (i * 10)).map(_ => text), 10) - .saveAsTextFile(new Path(directory, i.toString).toString) - Thread.sleep(1000) - } - Thread.sleep(5000) // Waiting for the file to be processed - ssc.stop() - System.exit(0) - } -} \ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala b/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala deleted file mode 100644 index b7bc15a1d5..0000000000 --- a/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala +++ /dev/null @@ -1,75 +0,0 @@ -package spark.streaming.examples - -import spark.streaming._ -import spark.streaming.StreamingContext._ -import org.apache.hadoop.fs.Path -import org.apache.hadoop.conf.Configuration - -object FileStreamWithCheckpoint { - - def main(args: Array[String]) { - - if (args.size != 3) { - println("FileStreamWithCheckpoint ") - println("FileStreamWithCheckpoint restart ") - System.exit(-1) - } - - val directory = new Path(args(1)) - val checkpointDir = args(2) - - val ssc: StreamingContext = { - - if (args(0) == "restart") { - - // Recreated streaming context from specified checkpoint file - new StreamingContext(checkpointDir) - - } else { - - // Create directory if it does not exist - val fs = directory.getFileSystem(new Configuration()) - if (!fs.exists(directory)) fs.mkdirs(directory) - - // Create new streaming context - val ssc_ = new StreamingContext(args(0), "FileStreamWithCheckpoint", Seconds(1)) - ssc_.checkpoint(checkpointDir) - - // Setup the streaming computation - val inputStream = ssc_.textFileStream(directory.toString) - val words = inputStream.flatMap(_.split(" ")) - val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) - wordCounts.print() - - ssc_ - } - } - - // Start the stream computation - startFileWritingThread(directory.toString) - ssc.start() - } - - def startFileWritingThread(directory: String) { - - val fs = new Path(directory).getFileSystem(new Configuration()) - - val fileWritingThread = new Thread() { - override def run() { - val r = new scala.util.Random() - val text = "This is a sample text file with a random number " - while(true) { - val number = r.nextInt() - val file = new Path(directory, number.toString) - val fos = fs.create(file) - fos.writeChars(text + number) - fos.close() - println("Created text file " + file) - Thread.sleep(1000) - } - } - } - fileWritingThread.start() - } - -} diff --git a/streaming/src/main/scala/spark/streaming/examples/FlumeEventCount.scala b/streaming/src/main/scala/spark/streaming/examples/FlumeEventCount.scala deleted file mode 100644 index e60ce483a3..0000000000 --- a/streaming/src/main/scala/spark/streaming/examples/FlumeEventCount.scala +++ /dev/null @@ -1,43 +0,0 @@ -package spark.streaming.examples - -import spark.util.IntParam -import spark.storage.StorageLevel -import spark.streaming._ - -/** - * Produce a streaming count of events received from Flume. - * - * This should be used in conjunction with an AvroSink in Flume. It will start - * an Avro server on at the request host:port address and listen for requests. - * Your Flume AvroSink should be pointed to this address. - * - * Usage: FlumeEventCount - * - * is a Spark master URL - * is the host the Flume receiver will be started on - a receiver - * creates a server and listens for flume events. - * is the port the Flume receiver will listen on. - */ -object FlumeEventCount { - def main(args: Array[String]) { - if (args.length != 3) { - System.err.println( - "Usage: FlumeEventCount ") - System.exit(1) - } - - val Array(master, host, IntParam(port)) = args - - val batchInterval = Milliseconds(2000) - // Create the context and set the batch size - val ssc = new StreamingContext(master, "FlumeEventCount", batchInterval) - - // Create a flume stream - val stream = ssc.flumeStream(host,port,StorageLevel.MEMORY_ONLY) - - // Print out the count of events received from this server in each batch - stream.count().map(cnt => "Received " + cnt + " flume events." ).print() - - ssc.start() - } -} diff --git a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala deleted file mode 100644 index dfaaf03f03..0000000000 --- a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala +++ /dev/null @@ -1,33 +0,0 @@ -package spark.streaming.examples - -import spark.util.IntParam -import spark.storage.StorageLevel - -import spark.streaming._ -import spark.streaming.StreamingContext._ -import spark.streaming.util.RawTextHelper._ - -object GrepRaw { - def main(args: Array[String]) { - if (args.length != 5) { - System.err.println("Usage: GrepRaw ") - System.exit(1) - } - - val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args - - // Create the context - val ssc = new StreamingContext(master, "GrepRaw", Milliseconds(batchMillis)) - - // Warm up the JVMs on master and slave for JIT compilation to kick in - warmUp(ssc.sc) - - - val rawStreams = (1 to numStreams).map(_ => - ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray - val union = ssc.union(rawStreams) - union.filter(_.contains("Alice")).count().foreach(r => - println("Grep count: " + r.collect().mkString)) - ssc.start() - } -} diff --git a/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala deleted file mode 100644 index fe55db6e2c..0000000000 --- a/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala +++ /dev/null @@ -1,69 +0,0 @@ -package spark.streaming.examples - -import java.util.Properties -import kafka.message.Message -import kafka.producer.SyncProducerConfig -import kafka.producer._ -import spark.SparkContext -import spark.streaming._ -import spark.streaming.StreamingContext._ -import spark.storage.StorageLevel -import spark.streaming.util.RawTextHelper._ - -object KafkaWordCount { - def main(args: Array[String]) { - - if (args.length < 6) { - System.err.println("Usage: KafkaWordCount ") - System.exit(1) - } - - val Array(master, hostname, port, group, topics, numThreads) = args - - val sc = new SparkContext(master, "KafkaWordCount") - val ssc = new StreamingContext(sc, Seconds(2)) - ssc.checkpoint("checkpoint") - - val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap - val lines = ssc.kafkaStream[String](hostname, port.toInt, group, topicpMap) - val words = lines.flatMap(_.split(" ")) - val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2) - wordCounts.print() - - ssc.start() - } -} - -// Produces some random words between 1 and 100. -object KafkaWordCountProducer { - - def main(args: Array[String]) { - if (args.length < 3) { - System.err.println("Usage: KafkaWordCountProducer ") - System.exit(1) - } - - val Array(hostname, port, topic, messagesPerSec, wordsPerMessage) = args - - // Zookeper connection properties - val props = new Properties() - props.put("zk.connect", hostname + ":" + port) - props.put("serializer.class", "kafka.serializer.StringEncoder") - - val config = new ProducerConfig(props) - val producer = new Producer[String, String](config) - - // Send some messages - while(true) { - val messages = (1 to messagesPerSec.toInt).map { messageNum => - (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString).mkString(" ") - }.toArray - println(messages.mkString(",")) - val data = new ProducerData[String, String](topic, messages) - producer.send(data) - Thread.sleep(100) - } - } - -} - diff --git a/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala b/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala deleted file mode 100644 index 2a265d021d..0000000000 --- a/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala +++ /dev/null @@ -1,39 +0,0 @@ -package spark.streaming.examples - -import spark.RDD -import spark.streaming.{Seconds, StreamingContext} -import spark.streaming.StreamingContext._ - -import scala.collection.mutable.SynchronizedQueue - -object QueueStream { - - def main(args: Array[String]) { - if (args.length < 1) { - System.err.println("Usage: QueueStream ") - System.exit(1) - } - - // Create the context - val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1)) - - // Create the queue through which RDDs can be pushed to - // a QueueInputDStream - val rddQueue = new SynchronizedQueue[RDD[Int]]() - - // Create the QueueInputDStream and use it do some processing - val inputStream = ssc.queueStream(rddQueue) - val mappedStream = inputStream.map(x => (x % 10, 1)) - val reducedStream = mappedStream.reduceByKey(_ + _) - reducedStream.print() - ssc.start() - - // Create and push some RDDs into - for (i <- 1 to 30) { - rddQueue += ssc.sc.makeRDD(1 to 1000, 10) - Thread.sleep(1000) - } - ssc.stop() - System.exit(0) - } -} \ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala deleted file mode 100644 index 338834bc3c..0000000000 --- a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala +++ /dev/null @@ -1,49 +0,0 @@ -package spark.streaming.examples - -import spark.storage.StorageLevel -import spark.util.IntParam - -import spark.streaming._ -import spark.streaming.StreamingContext._ -import spark.streaming.util.RawTextHelper._ - -import java.util.UUID - -object TopKWordCountRaw { - - def main(args: Array[String]) { - if (args.length != 4) { - System.err.println("Usage: WordCountRaw <# streams> ") - System.exit(1) - } - - val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args - val k = 10 - - // Create the context, and set the checkpoint directory. - // Checkpoint directory is necessary for achieving fault-tolerance, by saving counts - // periodically to HDFS - val ssc = new StreamingContext(master, "TopKWordCountRaw", Seconds(1)) - ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1)) - - // Warm up the JVMs on master and slave for JIT compilation to kick in - /*warmUp(ssc.sc)*/ - - // Set up the raw network streams that will connect to localhost:port to raw test - // senders on the slaves and generate top K words of last 30 seconds - val lines = (1 to numStreams).map(_ => { - ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2) - }) - val union = ssc.union(lines) - val counts = union.mapPartitions(splitAndCountPartitions) - val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10) - val partialTopKWindowedCounts = windowedCounts.mapPartitions(topK(_, k)) - partialTopKWindowedCounts.foreach(rdd => { - val collectedCounts = rdd.collect - println("Collected " + collectedCounts.size + " words from partial top words") - println("Top " + k + " words are " + topK(collectedCounts.toIterator, k).mkString(",")) - }) - - ssc.start() - } -} diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala deleted file mode 100644 index 867a8f42c4..0000000000 --- a/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala +++ /dev/null @@ -1,25 +0,0 @@ -package spark.streaming.examples - -import spark.streaming.{Seconds, StreamingContext} -import spark.streaming.StreamingContext._ - -object WordCountHdfs { - def main(args: Array[String]) { - if (args.length < 2) { - System.err.println("Usage: WordCountHdfs ") - System.exit(1) - } - - // Create the context - val ssc = new StreamingContext(args(0), "WordCountHdfs", Seconds(2)) - - // Create the FileInputDStream on the directory and use the - // stream to count words in new files created - val lines = ssc.textFileStream(args(1)) - val words = lines.flatMap(_.split(" ")) - val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) - wordCounts.print() - ssc.start() - } -} - diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala deleted file mode 100644 index eadda60563..0000000000 --- a/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala +++ /dev/null @@ -1,25 +0,0 @@ -package spark.streaming.examples - -import spark.streaming.{Seconds, StreamingContext} -import spark.streaming.StreamingContext._ - -object WordCountNetwork { - def main(args: Array[String]) { - if (args.length < 2) { - System.err.println("Usage: WordCountNetwork \n" + - "In local mode, should be 'local[n]' with n > 1") - System.exit(1) - } - - // Create the context and set the batch size - val ssc = new StreamingContext(args(0), "WordCountNetwork", Seconds(1)) - - // Create a NetworkInputDStream on target ip:port and count the - // words in input stream of \n delimited test (eg. generated by 'nc') - val lines = ssc.networkTextStream(args(1), args(2).toInt) - val words = lines.flatMap(_.split(" ")) - val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) - wordCounts.print() - ssc.start() - } -} diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala deleted file mode 100644 index d93335a8ce..0000000000 --- a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala +++ /dev/null @@ -1,43 +0,0 @@ -package spark.streaming.examples - -import spark.storage.StorageLevel -import spark.util.IntParam - -import spark.streaming._ -import spark.streaming.StreamingContext._ -import spark.streaming.util.RawTextHelper._ - -import java.util.UUID - -object WordCountRaw { - - def main(args: Array[String]) { - if (args.length != 4) { - System.err.println("Usage: WordCountRaw <# streams> ") - System.exit(1) - } - - val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args - - // Create the context, and set the checkpoint directory. - // Checkpoint directory is necessary for achieving fault-tolerance, by saving counts - // periodically to HDFS - val ssc = new StreamingContext(master, "WordCountRaw", Seconds(1)) - ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1)) - - // Warm up the JVMs on master and slave for JIT compilation to kick in - warmUp(ssc.sc) - - // Set up the raw network streams that will connect to localhost:port to raw test - // senders on the slaves and generate count of words of last 30 seconds - val lines = (1 to numStreams).map(_ => { - ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2) - }) - val union = ssc.union(lines) - val counts = union.mapPartitions(splitAndCountPartitions) - val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10) - windowedCounts.foreach(r => println("# unique words = " + r.count())) - - ssc.start() - } -} diff --git a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala deleted file mode 100644 index 4c6e08bc74..0000000000 --- a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala +++ /dev/null @@ -1,85 +0,0 @@ -package spark.streaming.examples.clickstream - -import java.net.{InetAddress,ServerSocket,Socket,SocketException} -import java.io.{InputStreamReader, BufferedReader, PrintWriter} -import util.Random - -/** Represents a page view on a website with associated dimension data.*/ -class PageView(val url : String, val status : Int, val zipCode : Int, val userID : Int) { - override def toString() : String = { - "%s\t%s\t%s\t%s\n".format(url, status, zipCode, userID) - } -} -object PageView { - def fromString(in : String) : PageView = { - val parts = in.split("\t") - new PageView(parts(0), parts(1).toInt, parts(2).toInt, parts(3).toInt) - } -} - -/** Generates streaming events to simulate page views on a website. - * - * This should be used in tandem with PageViewStream.scala. Example: - * $ ./run spark.streaming.examples.clickstream.PageViewGenerator 44444 10 - * $ ./run spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444 - * */ -object PageViewGenerator { - val pages = Map("http://foo.com/" -> .7, - "http://foo.com/news" -> 0.2, - "http://foo.com/contact" -> .1) - val httpStatus = Map(200 -> .95, - 404 -> .05) - val userZipCode = Map(94709 -> .5, - 94117 -> .5) - val userID = Map((1 to 100).map(_ -> .01):_*) - - - def pickFromDistribution[T](inputMap : Map[T, Double]) : T = { - val rand = new Random().nextDouble() - var total = 0.0 - for ((item, prob) <- inputMap) { - total = total + prob - if (total > rand) { - return item - } - } - return inputMap.take(1).head._1 // Shouldn't get here if probabilities add up to 1.0 - } - - def getNextClickEvent() : String = { - val id = pickFromDistribution(userID) - val page = pickFromDistribution(pages) - val status = pickFromDistribution(httpStatus) - val zipCode = pickFromDistribution(userZipCode) - new PageView(page, status, zipCode, id).toString() - } - - def main(args : Array[String]) { - if (args.length != 2) { - System.err.println("Usage: PageViewGenerator ") - System.exit(1) - } - val port = args(0).toInt - val viewsPerSecond = args(1).toFloat - val sleepDelayMs = (1000.0 / viewsPerSecond).toInt - val listener = new ServerSocket(port) - println("Listening on port: " + port) - - while (true) { - val socket = listener.accept() - new Thread() { - override def run = { - println("Got client connected from: " + socket.getInetAddress) - val out = new PrintWriter(socket.getOutputStream(), true) - - while (true) { - Thread.sleep(sleepDelayMs) - out.write(getNextClickEvent()) - out.flush() - } - socket.close() - } - }.start() - } - } -} diff --git a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala deleted file mode 100644 index a191321d91..0000000000 --- a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala +++ /dev/null @@ -1,84 +0,0 @@ -package spark.streaming.examples.clickstream - -import spark.streaming.{Seconds, StreamingContext} -import spark.streaming.StreamingContext._ -import spark.SparkContext._ - -/** Analyses a streaming dataset of web page views. This class demonstrates several types of - * operators available in Spark streaming. - * - * This should be used in tandem with PageViewStream.scala. Example: - * $ ./run spark.streaming.examples.clickstream.PageViewGenerator 44444 10 - * $ ./run spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444 - * */ -object PageViewStream { - def main(args: Array[String]) { - if (args.length != 3) { - System.err.println("Usage: PageViewStream ") - System.err.println(" must be one of pageCounts, slidingPageCounts," + - " errorRatePerZipCode, activeUserCount, popularUsersSeen") - System.exit(1) - } - val metric = args(0) - val host = args(1) - val port = args(2).toInt - - // Create the context - val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1)) - - // Create a NetworkInputDStream on target host:port and convert each line to a PageView - val pageViews = ssc.networkTextStream(host, port) - .flatMap(_.split("\n")) - .map(PageView.fromString(_)) - - // Return a count of views per URL seen in each batch - val pageCounts = pageViews.map(view => ((view.url, 1))).countByKey() - - // Return a sliding window of page views per URL in the last ten seconds - val slidingPageCounts = pageViews.map(view => ((view.url, 1))) - .window(Seconds(10), Seconds(2)) - .countByKey() - - - // Return the rate of error pages (a non 200 status) in each zip code over the last 30 seconds - val statusesPerZipCode = pageViews.window(Seconds(30), Seconds(2)) - .map(view => ((view.zipCode, view.status))) - .groupByKey() - val errorRatePerZipCode = statusesPerZipCode.map{ - case(zip, statuses) => - val normalCount = statuses.filter(_ == 200).size - val errorCount = statuses.size - normalCount - val errorRatio = errorCount.toFloat / statuses.size - if (errorRatio > 0.05) {"%s: **%s**".format(zip, errorRatio)} - else {"%s: %s".format(zip, errorRatio)} - } - - // Return the number unique users in last 15 seconds - val activeUserCount = pageViews.window(Seconds(15), Seconds(2)) - .map(view => (view.userID, 1)) - .groupByKey() - .count() - .map("Unique active users: " + _) - - // An external dataset we want to join to this stream - val userList = ssc.sc.parallelize( - Map(1 -> "Patrick Wendell", 2->"Reynold Xin", 3->"Matei Zaharia").toSeq) - - metric match { - case "pageCounts" => pageCounts.print() - case "slidingPageCounts" => slidingPageCounts.print() - case "errorRatePerZipCode" => errorRatePerZipCode.print() - case "activeUserCount" => activeUserCount.print() - case "popularUsersSeen" => - // Look for users in our existing dataset and print it out if we have a match - pageViews.map(view => (view.userID, 1)) - .foreach((rdd, time) => rdd.join(userList) - .map(_._2._2) - .take(10) - .foreach(u => println("Saw user %s at time %s".format(u, time)))) - case _ => println("Invalid metric entered: " + metric) - } - - ssc.start() - } -} -- cgit v1.2.3