aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2012-11-14 18:08:02 -0800
committerPatrick Wendell <pwendell@gmail.com>2012-11-16 12:11:22 -0800
commit720cb0f46736105d200128f13081489281dbe118 (patch)
tree56a668f3ba4d57813d069715f8b2cf57de0fec48 /streaming
parent596154eabe51961733789a18a47067748fb72e8e (diff)
downloadspark-720cb0f46736105d200128f13081489281dbe118.tar.gz
spark-720cb0f46736105d200128f13081489281dbe118.tar.bz2
spark-720cb0f46736105d200128f13081489281dbe118.zip
A "streaming page view" example.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala85
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala85
2 files changed, 170 insertions, 0 deletions
diff --git a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala
new file mode 100644
index 0000000000..4c6e08bc74
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala
@@ -0,0 +1,85 @@
+package spark.streaming.examples.clickstream
+
+import java.net.{InetAddress,ServerSocket,Socket,SocketException}
+import java.io.{InputStreamReader, BufferedReader, PrintWriter}
+import util.Random
+
+/** Represents a page view on a website with associated dimension data.*/
+class PageView(val url : String, val status : Int, val zipCode : Int, val userID : Int) {
+ override def toString() : String = {
+ "%s\t%s\t%s\t%s\n".format(url, status, zipCode, userID)
+ }
+}
+object PageView {
+ def fromString(in : String) : PageView = {
+ val parts = in.split("\t")
+ new PageView(parts(0), parts(1).toInt, parts(2).toInt, parts(3).toInt)
+ }
+}
+
+/** Generates streaming events to simulate page views on a website.
+ *
+ * This should be used in tandem with PageViewStream.scala. Example:
+ * $ ./run spark.streaming.examples.clickstream.PageViewGenerator 44444 10
+ * $ ./run spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444
+ * */
+object PageViewGenerator {
+ val pages = Map("http://foo.com/" -> .7,
+ "http://foo.com/news" -> 0.2,
+ "http://foo.com/contact" -> .1)
+ val httpStatus = Map(200 -> .95,
+ 404 -> .05)
+ val userZipCode = Map(94709 -> .5,
+ 94117 -> .5)
+ val userID = Map((1 to 100).map(_ -> .01):_*)
+
+
+ def pickFromDistribution[T](inputMap : Map[T, Double]) : T = {
+ val rand = new Random().nextDouble()
+ var total = 0.0
+ for ((item, prob) <- inputMap) {
+ total = total + prob
+ if (total > rand) {
+ return item
+ }
+ }
+ return inputMap.take(1).head._1 // Shouldn't get here if probabilities add up to 1.0
+ }
+
+ def getNextClickEvent() : String = {
+ val id = pickFromDistribution(userID)
+ val page = pickFromDistribution(pages)
+ val status = pickFromDistribution(httpStatus)
+ val zipCode = pickFromDistribution(userZipCode)
+ new PageView(page, status, zipCode, id).toString()
+ }
+
+ def main(args : Array[String]) {
+ if (args.length != 2) {
+ System.err.println("Usage: PageViewGenerator <port> <viewsPerSecond>")
+ System.exit(1)
+ }
+ val port = args(0).toInt
+ val viewsPerSecond = args(1).toFloat
+ val sleepDelayMs = (1000.0 / viewsPerSecond).toInt
+ val listener = new ServerSocket(port)
+ println("Listening on port: " + port)
+
+ while (true) {
+ val socket = listener.accept()
+ new Thread() {
+ override def run = {
+ println("Got client connected from: " + socket.getInetAddress)
+ val out = new PrintWriter(socket.getOutputStream(), true)
+
+ while (true) {
+ Thread.sleep(sleepDelayMs)
+ out.write(getNextClickEvent())
+ out.flush()
+ }
+ socket.close()
+ }
+ }.start()
+ }
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
new file mode 100644
index 0000000000..1a51fb66cd
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
@@ -0,0 +1,85 @@
+package spark.streaming.examples.clickstream
+
+import spark.streaming.{Seconds, StreamingContext}
+import spark.streaming.StreamingContext._
+import spark.SparkContext._
+
+/** Analyses a streaming dataset of web page views. This class demonstrates several types of
+ * operators available in Spark streaming.
+ *
+ * This should be used in tandem with PageViewStream.scala. Example:
+ * $ ./run spark.streaming.examples.clickstream.PageViewGenerator 44444 10
+ * $ ./run spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444
+ * */
+object PageViewStream {
+ def main(args: Array[String]) {
+ if (args.length != 3) {
+ System.err.println("Usage: PageViewStream <metric> <host> <port>")
+ System.err.println("<metric> must be one of pageCounts, slidingPageCounts," +
+ " errorRatePerZipCode, activeUserCount, popularUsersSeen")
+ System.exit(1)
+ }
+ val metric = args(0)
+ val host = args(1)
+ val port = args(2).toInt
+
+ // Create the context and set the batch size
+ val ssc = new StreamingContext("local[2]", "PageViewStream")
+ ssc.setBatchDuration(Seconds(1))
+
+ // Create a NetworkInputDStream on target host:port and convert each line to a PageView
+ val pageViews = ssc.networkTextStream(host, port)
+ .flatMap(_.split("\n"))
+ .map(PageView.fromString(_))
+
+ // Return a count of views per URL seen in each batch
+ val pageCounts = pageViews.map(view => ((view.url, 1))).countByKey()
+
+ // Return a sliding window of page views per URL in the last ten seconds
+ val slidingPageCounts = pageViews.map(view => ((view.url, 1)))
+ .window(Seconds(10), Seconds(2))
+ .countByKey()
+
+
+ // Return the rate of error pages (a non 200 status) in each zip code over the last 30 seconds
+ val statusesPerZipCode = pageViews.window(Seconds(30), Seconds(2))
+ .map(view => ((view.zipCode, view.status)))
+ .groupByKey()
+ val errorRatePerZipCode = statusesPerZipCode.map{
+ case(zip, statuses) =>
+ val normalCount = statuses.filter(_ == 200).size
+ val errorCount = statuses.size - normalCount
+ val errorRatio = errorCount.toFloat / statuses.size
+ if (errorRatio > 0.05) {"%s: **%s**".format(zip, errorRatio)}
+ else {"%s: %s".format(zip, errorRatio)}
+ }
+
+ // Return the number unique users in last 15 seconds
+ val activeUserCount = pageViews.window(Seconds(15), Seconds(2))
+ .map(view => (view.userID, 1))
+ .groupByKey()
+ .count()
+ .map("Unique active users: " + _)
+
+ // An external dataset we want to join to this stream
+ val userList = ssc.sc.parallelize(
+ Map(1 -> "Patrick Wendell", 2->"Reynold Xin", 3->"Matei Zaharia").toSeq)
+
+ metric match {
+ case "pageCounts" => pageCounts.print()
+ case "slidingPageCounts" => slidingPageCounts.print()
+ case "errorRatePerZipCode" => errorRatePerZipCode.print()
+ case "activeUserCount" => activeUserCount.print()
+ case "popularUsersSeen" =>
+ // Look for users in our existing dataset and print it out if we have a match
+ pageViews.map(view => (view.userID, 1))
+ .foreachRDD((rdd, time) => rdd.join(userList)
+ .map(_._2._2)
+ .take(10)
+ .foreach(u => println("Saw user %s at time %s".format(u, time))))
+ case _ => println("Invalid metric entered: " + metric)
+ }
+
+ ssc.start()
+ }
+}