aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala
diff options
context:
space:
mode:
Diffstat (limited to 'examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala')
-rw-r--r--examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala94
1 files changed, 94 insertions, 0 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala
new file mode 100644
index 0000000000..a9642100e3
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala
@@ -0,0 +1,94 @@
+package spark.streaming.examples
+
+import spark.streaming.{Seconds, StreamingContext}
+import spark.storage.StorageLevel
+import com.twitter.algebird._
+import spark.streaming.StreamingContext._
+import spark.SparkContext._
+
+/**
+ * Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute
+ * windowed and global Top-K estimates of user IDs occurring in a Twitter stream.
+ * <br>
+ * <strong>Note</strong> that since Algebird's implementation currently only supports Long inputs,
+ * the example operates on Long IDs. Once the implementation supports other inputs (such as String),
+ * the same approach could be used for computing popular topics for example.
+ * <p>
+ * <p>
+ * <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
+ * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a datastructure
+ * for approximate frequency estimation in data streams (e.g. Top-K elements, frequency of any given element, etc),
+ * that uses space sub-linear in the number of elements in the stream. Once elements are added to the CMS, the
+ * estimated count of an element can be computed, as well as "heavy-hitters" that occur more than a threshold
+ * percentage of the overall total count.
+ * <p><p>
+ * Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the reduce operation.
+ */
+object TwitterAlgebirdCMS {
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println("Usage: TwitterAlgebirdCMS <master> <twitter_username> <twitter_password>" +
+ " [filter1] [filter2] ... [filter n]")
+ System.exit(1)
+ }
+
+ // CMS parameters
+ val DELTA = 1E-3
+ val EPS = 0.01
+ val SEED = 1
+ val PERC = 0.001
+ // K highest frequency elements to take
+ val TOPK = 10
+
+ val Array(master, username, password) = args.slice(0, 3)
+ val filters = args.slice(3, args.length)
+
+ val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER)
+
+ val users = stream.map(status => status.getUser.getId)
+
+ val cms = new CountMinSketchMonoid(EPS, DELTA, SEED, PERC)
+ var globalCMS = cms.zero
+ val mm = new MapMonoid[Long, Int]()
+ var globalExact = Map[Long, Int]()
+
+ val approxTopUsers = users.mapPartitions(ids => {
+ ids.map(id => cms.create(id))
+ }).reduce(_ ++ _)
+
+ val exactTopUsers = users.map(id => (id, 1))
+ .reduceByKey((a, b) => a + b)
+
+ approxTopUsers.foreach(rdd => {
+ if (rdd.count() != 0) {
+ val partial = rdd.first()
+ val partialTopK = partial.heavyHitters.map(id =>
+ (id, partial.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
+ globalCMS ++= partial
+ val globalTopK = globalCMS.heavyHitters.map(id =>
+ (id, globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
+ println("Approx heavy hitters at %2.2f%% threshold this batch: %s".format(PERC,
+ partialTopK.mkString("[", ",", "]")))
+ println("Approx heavy hitters at %2.2f%% threshold overall: %s".format(PERC,
+ globalTopK.mkString("[", ",", "]")))
+ }
+ })
+
+ exactTopUsers.foreach(rdd => {
+ if (rdd.count() != 0) {
+ val partialMap = rdd.collect().toMap
+ val partialTopK = rdd.map(
+ {case (id, count) => (count, id)})
+ .sortByKey(ascending = false).take(TOPK)
+ globalExact = mm.plus(globalExact.toMap, partialMap)
+ val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0, TOPK)
+ println("Exact heavy hitters this batch: %s".format(partialTopK.mkString("[", ",", "]")))
+ println("Exact heavy hitters overall: %s".format(globalTopK.mkString("[", ",", "]")))
+ }
+ })
+
+ ssc.start()
+ }
+}