From 015893f0e8983a7e249709d9820d1bf0dd74d607 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Tue, 19 Feb 2013 13:21:33 +0200 Subject: Adding streaming HyperLogLog example using Algebird --- examples/pom.xml | 5 ++ .../streaming/examples/twitter/StreamingHLL.scala | 62 ++++++++++++++++++++++ project/SparkBuild.scala | 3 +- 3 files changed, 69 insertions(+), 1 deletion(-) create mode 100644 examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala diff --git a/examples/pom.xml b/examples/pom.xml index f43af670c6..28da3dbde4 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -24,6 +24,11 @@ twitter4j-stream 3.0.3 + + com.twitter + algebird-core_2.9.2 + 0.1.8 + org.scalatest diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala b/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala new file mode 100644 index 0000000000..f67bb029c6 --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala @@ -0,0 +1,62 @@ +package spark.streaming.examples.twitter + +import spark.streaming.{Seconds, StreamingContext} +import spark.storage.StorageLevel +import com.twitter.algebird.HyperLogLog._ +import com.twitter.algebird.HyperLogLogMonoid + +/** + * Example of using HyperLogLog monoid from Twitter's Algebird together with Spark Streaming's + * TwitterInputDStream + */ +object StreamingHLL { + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: TwitterStreamingHLL " + + " [filter1] [filter2] ... [filter n]") + System.exit(1) + } + + val Array(master, username, password) = args.slice(0, 3) + val filters = args.slice(3, args.length) + + val ssc = new StreamingContext(master, "TwitterStreamingHLL", Seconds(2)) + val stream = new TwitterInputDStream(ssc, username, password, filters, + StorageLevel.MEMORY_ONLY_SER) + ssc.registerInputStream(stream) + + val users = stream.map(status => status.getUser.getId) + + val globalHll = new HyperLogLogMonoid(12) + var userSet: Set[Long] = Set() + + val approxUsers = users.mapPartitions(ids => { + val hll = new HyperLogLogMonoid(12) + ids.map(id => hll(id)) + }).reduce(_ + _) + + val exactUsers = users.map(id => Set(id)).reduce(_ ++ _) + + var h = globalHll.zero + approxUsers.foreach(rdd => { + if (rdd.count() != 0) { + val partial = rdd.first() + h += partial + println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt)) + println("Approx distinct users overall: %d".format(globalHll.estimateSize(h).toInt)) + } + }) + + exactUsers.foreach(rdd => { + if (rdd.count() != 0) { + val partial = rdd.first() + userSet ++= partial + println("Exact distinct users this batch: %d".format(partial.size)) + println("Exact distinct users overall: %d".format(userSet.size)) + println("Error rate: %2.5f%%".format(((globalHll.estimateSize(h) / userSet.size.toDouble) - 1) * 100)) + } + }) + + ssc.start() + } +} diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index af8b5ba017..18cc9ea90e 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -156,7 +156,8 @@ object SparkBuild extends Build { def examplesSettings = sharedSettings ++ Seq( name := "spark-examples", libraryDependencies ++= Seq( - "org.twitter4j" % "twitter4j-stream" % "3.0.3" + "org.twitter4j" % "twitter4j-stream" % "3.0.3", + "com.twitter" % "algebird-core_2.9.2" % "0.1.8" ) ) -- cgit v1.2.3 From d8ee184d950ac91ad4f617fc0d7b2f02ed6b74d3 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Tue, 19 Feb 2013 17:42:57 +0200 Subject: Dependencies and refactoring for streaming HLL example, and using context.twitterStream method --- examples/pom.xml | 6 --- .../streaming/examples/TwitterAlgebirdHLL.scala | 62 +++++++++++++++++++++ .../streaming/examples/twitter/StreamingHLL.scala | 63 ---------------------- streaming/pom.xml | 10 ++++ 4 files changed, 72 insertions(+), 69 deletions(-) create mode 100644 examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala delete mode 100644 examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala diff --git a/examples/pom.xml b/examples/pom.xml index 28da3dbde4..7d975875fa 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -19,17 +19,11 @@ org.eclipse.jetty jetty-server - - org.twitter4j - twitter4j-stream - 3.0.3 - com.twitter algebird-core_2.9.2 0.1.8 - org.scalatest scalatest_${scala.version} diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala new file mode 100644 index 0000000000..c2095f5b94 --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala @@ -0,0 +1,62 @@ +package spark.streaming.examples + +import spark.streaming.{Seconds, StreamingContext} +import spark.storage.StorageLevel +import com.twitter.algebird.HyperLogLog._ +import com.twitter.algebird.HyperLogLogMonoid +import spark.streaming.dstream.TwitterInputDStream + +/** + * Example using HyperLogLog monoid from Twitter's Algebird together with Spark Streaming's + * TwitterInputDStream to compute approximate distinct counts of userids. + */ +object TwitterAlgebirdHLL { + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: TwitterAlgebirdHLL " + + " [filter1] [filter2] ... [filter n]") + System.exit(1) + } + + /** Bit size parameter for HyperLogLog */ + val BIT_SIZE = 12 + val Array(master, username, password) = args.slice(0, 3) + val filters = args.slice(3, args.length) + + val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5)) + val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER) + + val users = stream.map(status => status.getUser.getId) + + var globalHll = new HyperLogLogMonoid(BIT_SIZE).zero + var userSet: Set[Long] = Set() + + val approxUsers = users.mapPartitions(ids => { + val hll = new HyperLogLogMonoid(BIT_SIZE) + ids.map(id => hll(id)) + }).reduce(_ + _) + + val exactUsers = users.map(id => Set(id)).reduce(_ ++ _) + + approxUsers.foreach(rdd => { + if (rdd.count() != 0) { + val partial = rdd.first() + globalHll += partial + println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt)) + println("Approx distinct users overall: %d".format(globalHll.estimatedSize.toInt)) + } + }) + + exactUsers.foreach(rdd => { + if (rdd.count() != 0) { + val partial = rdd.first() + userSet ++= partial + println("Exact distinct users this batch: %d".format(partial.size)) + println("Exact distinct users overall: %d".format(userSet.size)) + println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1) * 100)) + } + }) + + ssc.start() + } +} diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala b/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala deleted file mode 100644 index 023a0add80..0000000000 --- a/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala +++ /dev/null @@ -1,63 +0,0 @@ -package spark.streaming.examples.twitter - -import spark.streaming.{Seconds, StreamingContext} -import spark.storage.StorageLevel -import com.twitter.algebird.HyperLogLog._ -import com.twitter.algebird.HyperLogLogMonoid -import spark.streaming.dstream.TwitterInputDStream - -/** - * Example of using HyperLogLog monoid from Twitter's Algebird together with Spark Streaming's - * TwitterInputDStream - */ -object StreamingHLL { - def main(args: Array[String]) { - if (args.length < 3) { - System.err.println("Usage: TwitterStreamingHLL " + - " [filter1] [filter2] ... [filter n]") - System.exit(1) - } - - val Array(master, username, password) = args.slice(0, 3) - val filters = args.slice(3, args.length) - - val ssc = new StreamingContext(master, "TwitterStreamingHLL", Seconds(2)) - val stream = new TwitterInputDStream(ssc, username, password, filters, - StorageLevel.MEMORY_ONLY_SER) - ssc.registerInputStream(stream) - - val users = stream.map(status => status.getUser.getId) - - val globalHll = new HyperLogLogMonoid(12) - var userSet: Set[Long] = Set() - - val approxUsers = users.mapPartitions(ids => { - val hll = new HyperLogLogMonoid(12) - ids.map(id => hll(id)) - }).reduce(_ + _) - - val exactUsers = users.map(id => Set(id)).reduce(_ ++ _) - - var h = globalHll.zero - approxUsers.foreach(rdd => { - if (rdd.count() != 0) { - val partial = rdd.first() - h += partial - println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt)) - println("Approx distinct users overall: %d".format(globalHll.estimateSize(h).toInt)) - } - }) - - exactUsers.foreach(rdd => { - if (rdd.count() != 0) { - val partial = rdd.first() - userSet ++= partial - println("Exact distinct users this batch: %d".format(partial.size)) - println("Exact distinct users overall: %d".format(userSet.size)) - println("Error rate: %2.5f%%".format(((globalHll.estimateSize(h) / userSet.size.toDouble) - 1) * 100)) - } - }) - - ssc.start() - } -} diff --git a/streaming/pom.xml b/streaming/pom.xml index 6ee7e59df3..d78c39da0d 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -47,6 +47,16 @@ zkclient 0.1 + + org.twitter4j + twitter4j-stream + 3.0.3 + + + org.twitter4j + twitter4j-core + 3.0.3 + org.scalatest -- cgit v1.2.3 From 8a281399f970db761ea05baf07972fff1c5bd058 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Tue, 19 Feb 2013 17:56:02 +0200 Subject: Streaming example using Twitter Algebird's Count Min Sketch monoid --- .../streaming/examples/TwitterAlgebirdCMS.scala | 78 ++++++++++++++++++++++ 1 file changed, 78 insertions(+) create mode 100644 examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala new file mode 100644 index 0000000000..d50e86aff5 --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala @@ -0,0 +1,78 @@ +package spark.streaming.examples + +import spark.streaming.{Seconds, StreamingContext} +import spark.storage.StorageLevel +import com.twitter.algebird._ +import spark.streaming.StreamingContext._ +import spark.SparkContext._ + +/** + * Example of using CountMinSketch monoid from Twitter's Algebird together with Spark Streaming's + * TwitterInputDStream + */ +object TwitterAlgebirdCMS { + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: TwitterAlgebirdCMS " + + " [filter1] [filter2] ... [filter n]") + System.exit(1) + } + + val DELTA = 1E-3 + val EPS = 0.01 + val SEED = 1 + val PERC = 0.001 + val TOPK = 10 + + val Array(master, username, password) = args.slice(0, 3) + val filters = args.slice(3, args.length) + + val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10)) + val stream = ssc.twitterStream(username, password, filters, + StorageLevel.MEMORY_ONLY_SER) + + val users = stream.map(status => status.getUser.getId) + + var globalCMS = new CountMinSketchMonoid(DELTA, EPS, SEED, PERC).zero + var globalExact = Map[Long, Int]() + val mm = new MapMonoid[Long, Int]() + + val approxTopUsers = users.mapPartitions(ids => { + val cms = new CountMinSketchMonoid(DELTA, EPS, SEED, PERC) + ids.map(id => cms.create(id)) + }).reduce(_ ++ _) + + val exactTopUsers = users.map(id => (id, 1)) + .reduceByKey((a, b) => a + b) + + approxTopUsers.foreach(rdd => { + if (rdd.count() != 0) { + val partial = rdd.first() + val partialTopK = partial.heavyHitters.map(id => + (id, partial.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK) + globalCMS ++= partial + val globalTopK = globalCMS.heavyHitters.map(id => + (id, globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK) + println("Approx heavy hitters at %2.2f%% threshold this batch: %s".format(PERC, + partialTopK.mkString("[", ",", "]"))) + println("Approx heavy hitters at %2.2f%% threshold overall: %s".format(PERC, + globalTopK.mkString("[", ",", "]"))) + } + }) + + exactTopUsers.foreach(rdd => { + if (rdd.count() != 0) { + val partialMap = rdd.collect().toMap + val partialTopK = rdd.map( + {case (id, count) => (count, id)}) + .sortByKey(ascending = false).take(TOPK) + globalExact = mm.plus(globalExact.toMap, partialMap) + val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0, TOPK) + println("Exact heavy hitters this batch: %s".format(partialTopK.mkString("[", ",", "]"))) + println("Exact heavy hitters overall: %s".format(globalTopK.mkString("[", ",", "]"))) + } + }) + + ssc.start() + } +} -- cgit v1.2.3 From 718474b9c6ab985833af9cde69f13c4d33498f6d Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Thu, 21 Feb 2013 12:11:31 +0200 Subject: Bumping Algebird to 0.1.9 --- examples/pom.xml | 2 +- project/SparkBuild.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/pom.xml b/examples/pom.xml index 7d975875fa..f6125444e2 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -22,7 +22,7 @@ com.twitter algebird-core_2.9.2 - 0.1.8 + 0.1.9 org.scalatest diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 090fd65bcb..030a60f5bd 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -155,7 +155,7 @@ object SparkBuild extends Build { def examplesSettings = sharedSettings ++ Seq( name := "spark-examples", - libraryDependencies ++= Seq("com.twitter" % "algebird-core_2.9.2" % "0.1.8") + libraryDependencies ++= Seq("com.twitter" % "algebird-core_2.9.2" % "0.1.9") ) def bagelSettings = sharedSettings ++ Seq(name := "spark-bagel") -- cgit v1.2.3 From d9bdae8cc249ee8f595a849c5a751caef75140c5 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Thu, 21 Feb 2013 12:31:31 +0200 Subject: Adding documentation for HLL and CMS examples. More efficient and clear use of the monoids. --- .../streaming/examples/TwitterAlgebirdCMS.scala | 29 ++++++++++++++++------ .../streaming/examples/TwitterAlgebirdHLL.scala | 19 ++++++++++---- 2 files changed, 36 insertions(+), 12 deletions(-) diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala index d50e86aff5..39a1a702ee 100644 --- a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala +++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala @@ -7,8 +7,22 @@ import spark.streaming.StreamingContext._ import spark.SparkContext._ /** - * Example of using CountMinSketch monoid from Twitter's Algebird together with Spark Streaming's - * TwitterInputDStream + * Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute + * windowed and global Top-K estimates of user IDs occurring in a Twitter stream. + *
+ * Note that since Algebird's implementation currently only supports Long inputs, + * the example operates on Long IDs. Once the implementation supports other inputs (such as String), + * the same approach could be used for computing popular topics for example. + *

+ *

+ * + * This blog post has a good overview of the Count-Min Sketch (CMS). The CMS is a datastructure + * for approximate frequency estimation in data streams (e.g. Top-K elements, frequency of any given element, etc), + * that uses space sub-linear in the number of elements in the stream. Once elements are added to the CMS, the + * estimated count of an element can be computed, as well as "heavy-hitters" that occur more than a threshold + * percentage of the overall total count. + *

+ * Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the reduce operation. */ object TwitterAlgebirdCMS { def main(args: Array[String]) { @@ -18,27 +32,28 @@ object TwitterAlgebirdCMS { System.exit(1) } + // CMS parameters val DELTA = 1E-3 val EPS = 0.01 val SEED = 1 val PERC = 0.001 + // K highest frequency elements to take val TOPK = 10 val Array(master, username, password) = args.slice(0, 3) val filters = args.slice(3, args.length) val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10)) - val stream = ssc.twitterStream(username, password, filters, - StorageLevel.MEMORY_ONLY_SER) + val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER) val users = stream.map(status => status.getUser.getId) - var globalCMS = new CountMinSketchMonoid(DELTA, EPS, SEED, PERC).zero - var globalExact = Map[Long, Int]() + val cms = new CountMinSketchMonoid(DELTA, EPS, SEED, PERC) + var globalCMS = cms.zero val mm = new MapMonoid[Long, Int]() + var globalExact = Map[Long, Int]() val approxTopUsers = users.mapPartitions(ids => { - val cms = new CountMinSketchMonoid(DELTA, EPS, SEED, PERC) ids.map(id => cms.create(id)) }).reduce(_ ++ _) diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala index c2095f5b94..914fba4ca2 100644 --- a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala +++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala @@ -7,8 +7,17 @@ import com.twitter.algebird.HyperLogLogMonoid import spark.streaming.dstream.TwitterInputDStream /** - * Example using HyperLogLog monoid from Twitter's Algebird together with Spark Streaming's - * TwitterInputDStream to compute approximate distinct counts of userids. + * Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute + * a windowed and global estimate of the unique user IDs occurring in a Twitter stream. + *

+ *

+ * This + * blog post and this + * blog post + * have good overviews of HyperLogLog (HLL). HLL is a memory-efficient datastructure for estimating + * the cardinality of a data stream, i.e. the number of unique elements. + *

+ * Algebird's implementation is a monoid, so we can succinctly merge two HLL instances in the reduce operation. */ object TwitterAlgebirdHLL { def main(args: Array[String]) { @@ -18,7 +27,7 @@ object TwitterAlgebirdHLL { System.exit(1) } - /** Bit size parameter for HyperLogLog */ + /** Bit size parameter for HyperLogLog, trades off accuracy vs size */ val BIT_SIZE = 12 val Array(master, username, password) = args.slice(0, 3) val filters = args.slice(3, args.length) @@ -28,11 +37,11 @@ object TwitterAlgebirdHLL { val users = stream.map(status => status.getUser.getId) - var globalHll = new HyperLogLogMonoid(BIT_SIZE).zero + val hll = new HyperLogLogMonoid(BIT_SIZE) + var globalHll = hll.zero var userSet: Set[Long] = Set() val approxUsers = users.mapPartitions(ids => { - val hll = new HyperLogLogMonoid(BIT_SIZE) ids.map(id => hll(id)) }).reduce(_ + _) -- cgit v1.2.3