From 720cb0f46736105d200128f13081489281dbe118 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Wed, 14 Nov 2012 18:08:02 -0800 Subject: A "streaming page view" example. --- .../examples/clickstream/PageViewGenerator.scala | 85 ++++++++++++++++++++++ .../examples/clickstream/PageViewStream.scala | 85 ++++++++++++++++++++++ 2 files changed, 170 insertions(+) create mode 100644 streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala create mode 100644 streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala (limited to 'streaming') diff --git a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala new file mode 100644 index 0000000000..4c6e08bc74 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala @@ -0,0 +1,85 @@ +package spark.streaming.examples.clickstream + +import java.net.{InetAddress,ServerSocket,Socket,SocketException} +import java.io.{InputStreamReader, BufferedReader, PrintWriter} +import util.Random + +/** Represents a page view on a website with associated dimension data.*/ +class PageView(val url : String, val status : Int, val zipCode : Int, val userID : Int) { + override def toString() : String = { + "%s\t%s\t%s\t%s\n".format(url, status, zipCode, userID) + } +} +object PageView { + def fromString(in : String) : PageView = { + val parts = in.split("\t") + new PageView(parts(0), parts(1).toInt, parts(2).toInt, parts(3).toInt) + } +} + +/** Generates streaming events to simulate page views on a website. + * + * This should be used in tandem with PageViewStream.scala. Example: + * $ ./run spark.streaming.examples.clickstream.PageViewGenerator 44444 10 + * $ ./run spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444 + * */ +object PageViewGenerator { + val pages = Map("http://foo.com/" -> .7, + "http://foo.com/news" -> 0.2, + "http://foo.com/contact" -> .1) + val httpStatus = Map(200 -> .95, + 404 -> .05) + val userZipCode = Map(94709 -> .5, + 94117 -> .5) + val userID = Map((1 to 100).map(_ -> .01):_*) + + + def pickFromDistribution[T](inputMap : Map[T, Double]) : T = { + val rand = new Random().nextDouble() + var total = 0.0 + for ((item, prob) <- inputMap) { + total = total + prob + if (total > rand) { + return item + } + } + return inputMap.take(1).head._1 // Shouldn't get here if probabilities add up to 1.0 + } + + def getNextClickEvent() : String = { + val id = pickFromDistribution(userID) + val page = pickFromDistribution(pages) + val status = pickFromDistribution(httpStatus) + val zipCode = pickFromDistribution(userZipCode) + new PageView(page, status, zipCode, id).toString() + } + + def main(args : Array[String]) { + if (args.length != 2) { + System.err.println("Usage: PageViewGenerator ") + System.exit(1) + } + val port = args(0).toInt + val viewsPerSecond = args(1).toFloat + val sleepDelayMs = (1000.0 / viewsPerSecond).toInt + val listener = new ServerSocket(port) + println("Listening on port: " + port) + + while (true) { + val socket = listener.accept() + new Thread() { + override def run = { + println("Got client connected from: " + socket.getInetAddress) + val out = new PrintWriter(socket.getOutputStream(), true) + + while (true) { + Thread.sleep(sleepDelayMs) + out.write(getNextClickEvent()) + out.flush() + } + socket.close() + } + }.start() + } + } +} diff --git a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala new file mode 100644 index 0000000000..1a51fb66cd --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala @@ -0,0 +1,85 @@ +package spark.streaming.examples.clickstream + +import spark.streaming.{Seconds, StreamingContext} +import spark.streaming.StreamingContext._ +import spark.SparkContext._ + +/** Analyses a streaming dataset of web page views. This class demonstrates several types of + * operators available in Spark streaming. + * + * This should be used in tandem with PageViewStream.scala. Example: + * $ ./run spark.streaming.examples.clickstream.PageViewGenerator 44444 10 + * $ ./run spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444 + * */ +object PageViewStream { + def main(args: Array[String]) { + if (args.length != 3) { + System.err.println("Usage: PageViewStream ") + System.err.println(" must be one of pageCounts, slidingPageCounts," + + " errorRatePerZipCode, activeUserCount, popularUsersSeen") + System.exit(1) + } + val metric = args(0) + val host = args(1) + val port = args(2).toInt + + // Create the context and set the batch size + val ssc = new StreamingContext("local[2]", "PageViewStream") + ssc.setBatchDuration(Seconds(1)) + + // Create a NetworkInputDStream on target host:port and convert each line to a PageView + val pageViews = ssc.networkTextStream(host, port) + .flatMap(_.split("\n")) + .map(PageView.fromString(_)) + + // Return a count of views per URL seen in each batch + val pageCounts = pageViews.map(view => ((view.url, 1))).countByKey() + + // Return a sliding window of page views per URL in the last ten seconds + val slidingPageCounts = pageViews.map(view => ((view.url, 1))) + .window(Seconds(10), Seconds(2)) + .countByKey() + + + // Return the rate of error pages (a non 200 status) in each zip code over the last 30 seconds + val statusesPerZipCode = pageViews.window(Seconds(30), Seconds(2)) + .map(view => ((view.zipCode, view.status))) + .groupByKey() + val errorRatePerZipCode = statusesPerZipCode.map{ + case(zip, statuses) => + val normalCount = statuses.filter(_ == 200).size + val errorCount = statuses.size - normalCount + val errorRatio = errorCount.toFloat / statuses.size + if (errorRatio > 0.05) {"%s: **%s**".format(zip, errorRatio)} + else {"%s: %s".format(zip, errorRatio)} + } + + // Return the number unique users in last 15 seconds + val activeUserCount = pageViews.window(Seconds(15), Seconds(2)) + .map(view => (view.userID, 1)) + .groupByKey() + .count() + .map("Unique active users: " + _) + + // An external dataset we want to join to this stream + val userList = ssc.sc.parallelize( + Map(1 -> "Patrick Wendell", 2->"Reynold Xin", 3->"Matei Zaharia").toSeq) + + metric match { + case "pageCounts" => pageCounts.print() + case "slidingPageCounts" => slidingPageCounts.print() + case "errorRatePerZipCode" => errorRatePerZipCode.print() + case "activeUserCount" => activeUserCount.print() + case "popularUsersSeen" => + // Look for users in our existing dataset and print it out if we have a match + pageViews.map(view => (view.userID, 1)) + .foreachRDD((rdd, time) => rdd.join(userList) + .map(_._2._2) + .take(10) + .foreach(u => println("Saw user %s at time %s".format(u, time)))) + case _ => println("Invalid metric entered: " + metric) + } + + ssc.start() + } +} -- cgit v1.2.3 From 10c1abcb6ac42b248818fa585a9ad49c2fa4851a Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sat, 17 Nov 2012 17:27:00 -0800 Subject: Fixed checkpointing bug in CoGroupedRDD. CoGroupSplits kept around the RDD splits of its parent RDDs, thus checkpointing its parents did not release the references to the parent splits. --- core/src/main/scala/spark/rdd/CoGroupedRDD.scala | 18 ++++++++++---- core/src/test/scala/spark/CheckpointSuite.scala | 28 ++++++++++++++++++++++ .../src/main/scala/spark/streaming/DStream.scala | 4 ++-- .../main/scala/spark/streaming/DStreamGraph.scala | 2 +- 4 files changed, 45 insertions(+), 7 deletions(-) (limited to 'streaming') diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala index a313ebcbe8..94ef1b56e8 100644 --- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala @@ -12,9 +12,20 @@ import spark.RDD import spark.ShuffleDependency import spark.SparkEnv import spark.Split +import java.io.{ObjectOutputStream, IOException} private[spark] sealed trait CoGroupSplitDep extends Serializable -private[spark] case class NarrowCoGroupSplitDep(rdd: RDD[_], split: Split) extends CoGroupSplitDep +private[spark] case class NarrowCoGroupSplitDep(rdd: RDD[_], splitIndex: Int, var split: Split = null) + extends CoGroupSplitDep { + @throws(classOf[IOException]) + private def writeObject(oos: ObjectOutputStream) { + rdd.synchronized { + // Update the reference to parent split at the time of task serialization + split = rdd.splits(splitIndex) + oos.defaultWriteObject() + } + } +} private[spark] case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep private[spark] @@ -55,7 +66,6 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) @transient var splits_ : Array[Split] = { - val firstRdd = rdds.head val array = new Array[Split](part.numPartitions) for (i <- 0 until array.size) { array(i) = new CoGroupSplit(i, rdds.zipWithIndex.map { case (r, j) => @@ -63,7 +73,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) case s: ShuffleDependency[_, _] => new ShuffleCoGroupSplitDep(s.shuffleId): CoGroupSplitDep case _ => - new NarrowCoGroupSplitDep(r, r.splits(i)): CoGroupSplitDep + new NarrowCoGroupSplitDep(r, i): CoGroupSplitDep } }.toList) } @@ -82,7 +92,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) map.getOrElseUpdate(k, Array.fill(numRdds)(new ArrayBuffer[Any])) } for ((dep, depNum) <- split.deps.zipWithIndex) dep match { - case NarrowCoGroupSplitDep(rdd, itsSplit) => { + case NarrowCoGroupSplitDep(rdd, itsSplitIndex, itsSplit) => { // Read them from the parent for ((k, v) <- rdd.iterator(itsSplit)) { getSeq(k.asInstanceOf[K])(depNum) += v diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala index 57dc43ddac..8622ce92aa 100644 --- a/core/src/test/scala/spark/CheckpointSuite.scala +++ b/core/src/test/scala/spark/CheckpointSuite.scala @@ -6,6 +6,7 @@ import rdd.{BlockRDD, CoalescedRDD, MapPartitionsWithSplitRDD} import spark.SparkContext._ import storage.StorageLevel import java.util.concurrent.Semaphore +import collection.mutable.ArrayBuffer class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging { initLogging() @@ -92,6 +93,33 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging { val rdd2 = sc.makeRDD(5 to 6, 4).map(x => (x % 2, 1)) testCheckpointing(rdd1 => rdd1.map(x => (x % 2, 1)).cogroup(rdd2)) testCheckpointing(rdd1 => rdd1.map(x => (x % 2, x)).join(rdd2)) + + // Special test to make sure that the CoGroupSplit of CoGroupedRDD do not + // hold on to the splits of its parent RDDs, as the splits of parent RDDs + // may change while checkpointing. Rather the splits of parent RDDs must + // be fetched at the time of serialization to ensure the latest splits to + // be sent along with the task. + + val add = (x: (Seq[Int], Seq[Int])) => (x._1 ++ x._2).reduce(_ + _) + + val ones = sc.parallelize(1 to 100, 1).map(x => (x,1)) + val reduced = ones.reduceByKey(_ + _) + val seqOfCogrouped = new ArrayBuffer[RDD[(Int, Int)]]() + seqOfCogrouped += reduced.cogroup(ones).mapValues[Int](add) + for(i <- 1 to 10) { + seqOfCogrouped += seqOfCogrouped.last.cogroup(ones).mapValues(add) + } + val finalCogrouped = seqOfCogrouped.last + val intermediateCogrouped = seqOfCogrouped(5) + + val bytesBeforeCheckpoint = Utils.serialize(finalCogrouped.splits) + intermediateCogrouped.checkpoint() + finalCogrouped.count() + sleep(intermediateCogrouped) + val bytesAfterCheckpoint = Utils.serialize(finalCogrouped.splits) + println("Before = " + bytesBeforeCheckpoint.size + ", after = " + bytesAfterCheckpoint.size) + assert(bytesAfterCheckpoint.size < bytesBeforeCheckpoint.size, + "CoGroupedSplits still holds on to the splits of its parent RDDs") } /** diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 76cdf8c464..13770aa8fd 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -226,11 +226,11 @@ extends Serializable with Logging { case Some(newRDD) => if (storageLevel != StorageLevel.NONE) { newRDD.persist(storageLevel) - logInfo("Persisting RDD for time " + time + " to " + storageLevel + " at time " + time) + logInfo("Persisting RDD " + newRDD.id + " for time " + time + " to " + storageLevel + " at time " + time) } if (checkpointInterval != null && (time - zeroTime).isMultipleOf(checkpointInterval)) { newRDD.checkpoint() - logInfo("Marking RDD " + newRDD + " for time " + time + " for checkpointing at time " + time) + logInfo("Marking RDD " + newRDD.id + " for time " + time + " for checkpointing at time " + time) } generatedRDDs.put(time, newRDD) Some(newRDD) diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala index 246522838a..bd8c033eab 100644 --- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala @@ -105,7 +105,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { private[streaming] def validate() { this.synchronized { assert(batchDuration != null, "Batch duration has not been set") - assert(batchDuration > Milliseconds(100), "Batch duration of " + batchDuration + " is very low") + //assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration + " is very low") assert(getOutputStreams().size > 0, "No output streams registered, so nothing to execute") } } -- cgit v1.2.3