diff options
author | Nick Pentreath <nick.pentreath@gmail.com> | 2013-02-21 12:31:31 +0200 |
---|---|---|
committer | Nick Pentreath <nick.pentreath@gmail.com> | 2013-02-21 12:31:31 +0200 |
commit | d9bdae8cc249ee8f595a849c5a751caef75140c5 (patch) | |
tree | 075ae972495ed7f0ca10cfed5e5d39d5ea5a2b1a | |
parent | 718474b9c6ab985833af9cde69f13c4d33498f6d (diff) | |
download | spark-d9bdae8cc249ee8f595a849c5a751caef75140c5.tar.gz spark-d9bdae8cc249ee8f595a849c5a751caef75140c5.tar.bz2 spark-d9bdae8cc249ee8f595a849c5a751caef75140c5.zip |
Adding documentation for HLL and CMS examples. More efficient and clear use of the monoids.
-rw-r--r-- | examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala | 29 | ||||
-rw-r--r-- | examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala | 19 |
2 files changed, 36 insertions, 12 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala index d50e86aff5..39a1a702ee 100644 --- a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala +++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala @@ -7,8 +7,22 @@ import spark.streaming.StreamingContext._ import spark.SparkContext._ /** - * Example of using CountMinSketch monoid from Twitter's Algebird together with Spark Streaming's - * TwitterInputDStream + * Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute + * windowed and global Top-K estimates of user IDs occurring in a Twitter stream. + * <br> + * <strong>Note</strong> that since Algebird's implementation currently only supports Long inputs, + * the example operates on Long IDs. Once the implementation supports other inputs (such as String), + * the same approach could be used for computing popular topics for example. + * <p> + * <p> + * <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/"> + * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a datastructure + * for approximate frequency estimation in data streams (e.g. Top-K elements, frequency of any given element, etc), + * that uses space sub-linear in the number of elements in the stream. Once elements are added to the CMS, the + * estimated count of an element can be computed, as well as "heavy-hitters" that occur more than a threshold + * percentage of the overall total count. + * <p><p> + * Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the reduce operation. */ object TwitterAlgebirdCMS { def main(args: Array[String]) { @@ -18,27 +32,28 @@ object TwitterAlgebirdCMS { System.exit(1) } + // CMS parameters val DELTA = 1E-3 val EPS = 0.01 val SEED = 1 val PERC = 0.001 + // K highest frequency elements to take val TOPK = 10 val Array(master, username, password) = args.slice(0, 3) val filters = args.slice(3, args.length) val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10)) - val stream = ssc.twitterStream(username, password, filters, - StorageLevel.MEMORY_ONLY_SER) + val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER) val users = stream.map(status => status.getUser.getId) - var globalCMS = new CountMinSketchMonoid(DELTA, EPS, SEED, PERC).zero - var globalExact = Map[Long, Int]() + val cms = new CountMinSketchMonoid(DELTA, EPS, SEED, PERC) + var globalCMS = cms.zero val mm = new MapMonoid[Long, Int]() + var globalExact = Map[Long, Int]() val approxTopUsers = users.mapPartitions(ids => { - val cms = new CountMinSketchMonoid(DELTA, EPS, SEED, PERC) ids.map(id => cms.create(id)) }).reduce(_ ++ _) diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala index c2095f5b94..914fba4ca2 100644 --- a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala +++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala @@ -7,8 +7,17 @@ import com.twitter.algebird.HyperLogLogMonoid import spark.streaming.dstream.TwitterInputDStream /** - * Example using HyperLogLog monoid from Twitter's Algebird together with Spark Streaming's - * TwitterInputDStream to compute approximate distinct counts of userids. + * Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute + * a windowed and global estimate of the unique user IDs occurring in a Twitter stream. + * <p> + * <p> + * This <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/"> + * blog post</a> and this + * <a href="http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html">blog post</a> + * have good overviews of HyperLogLog (HLL). HLL is a memory-efficient datastructure for estimating + * the cardinality of a data stream, i.e. the number of unique elements. + * <p><p> + * Algebird's implementation is a monoid, so we can succinctly merge two HLL instances in the reduce operation. */ object TwitterAlgebirdHLL { def main(args: Array[String]) { @@ -18,7 +27,7 @@ object TwitterAlgebirdHLL { System.exit(1) } - /** Bit size parameter for HyperLogLog */ + /** Bit size parameter for HyperLogLog, trades off accuracy vs size */ val BIT_SIZE = 12 val Array(master, username, password) = args.slice(0, 3) val filters = args.slice(3, args.length) @@ -28,11 +37,11 @@ object TwitterAlgebirdHLL { val users = stream.map(status => status.getUser.getId) - var globalHll = new HyperLogLogMonoid(BIT_SIZE).zero + val hll = new HyperLogLogMonoid(BIT_SIZE) + var globalHll = hll.zero var userSet: Set[Long] = Set() val approxUsers = users.mapPartitions(ids => { - val hll = new HyperLogLogMonoid(BIT_SIZE) ids.map(id => hll(id)) }).reduce(_ + _) |