diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-22 12:29:04 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-22 12:29:04 -0800 |
commit | cfa65ebff1e01fc55f02c7289866a791b11d756e (patch) | |
tree | a6413114079a39755ca8122c9cb11b5fdd940e50 /examples | |
parent | 688e62718fd547ed46a90f7ac57c39df35866e6b (diff) | |
parent | d9bdae8cc249ee8f595a849c5a751caef75140c5 (diff) | |
download | spark-cfa65ebff1e01fc55f02c7289866a791b11d756e.tar.gz spark-cfa65ebff1e01fc55f02c7289866a791b11d756e.tar.bz2 spark-cfa65ebff1e01fc55f02c7289866a791b11d756e.zip |
Merge pull request #480 from MLnick/streaming-eg-algebird
[Streaming] Examples using Twitter's Algebird library
Diffstat (limited to 'examples')
-rw-r--r-- | examples/pom.xml | 7 | ||||
-rw-r--r-- | examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala | 93 | ||||
-rw-r--r-- | examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala | 71 |
3 files changed, 167 insertions, 4 deletions
diff --git a/examples/pom.xml b/examples/pom.xml index f43af670c6..f6125444e2 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -20,11 +20,10 @@ <artifactId>jetty-server</artifactId> </dependency> <dependency> - <groupId>org.twitter4j</groupId> - <artifactId>twitter4j-stream</artifactId> - <version>3.0.3</version> + <groupId>com.twitter</groupId> + <artifactId>algebird-core_2.9.2</artifactId> + <version>0.1.9</version> </dependency> - <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.version}</artifactId> diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala new file mode 100644 index 0000000000..39a1a702ee --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala @@ -0,0 +1,93 @@ +package spark.streaming.examples + +import spark.streaming.{Seconds, StreamingContext} +import spark.storage.StorageLevel +import com.twitter.algebird._ +import spark.streaming.StreamingContext._ +import spark.SparkContext._ + +/** + * Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute + * windowed and global Top-K estimates of user IDs occurring in a Twitter stream. + * <br> + * <strong>Note</strong> that since Algebird's implementation currently only supports Long inputs, + * the example operates on Long IDs. Once the implementation supports other inputs (such as String), + * the same approach could be used for computing popular topics for example. + * <p> + * <p> + * <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/"> + * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a datastructure + * for approximate frequency estimation in data streams (e.g. Top-K elements, frequency of any given element, etc), + * that uses space sub-linear in the number of elements in the stream. Once elements are added to the CMS, the + * estimated count of an element can be computed, as well as "heavy-hitters" that occur more than a threshold + * percentage of the overall total count. + * <p><p> + * Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the reduce operation. + */ +object TwitterAlgebirdCMS { + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: TwitterAlgebirdCMS <master> <twitter_username> <twitter_password>" + + " [filter1] [filter2] ... [filter n]") + System.exit(1) + } + + // CMS parameters + val DELTA = 1E-3 + val EPS = 0.01 + val SEED = 1 + val PERC = 0.001 + // K highest frequency elements to take + val TOPK = 10 + + val Array(master, username, password) = args.slice(0, 3) + val filters = args.slice(3, args.length) + + val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10)) + val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER) + + val users = stream.map(status => status.getUser.getId) + + val cms = new CountMinSketchMonoid(DELTA, EPS, SEED, PERC) + var globalCMS = cms.zero + val mm = new MapMonoid[Long, Int]() + var globalExact = Map[Long, Int]() + + val approxTopUsers = users.mapPartitions(ids => { + ids.map(id => cms.create(id)) + }).reduce(_ ++ _) + + val exactTopUsers = users.map(id => (id, 1)) + .reduceByKey((a, b) => a + b) + + approxTopUsers.foreach(rdd => { + if (rdd.count() != 0) { + val partial = rdd.first() + val partialTopK = partial.heavyHitters.map(id => + (id, partial.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK) + globalCMS ++= partial + val globalTopK = globalCMS.heavyHitters.map(id => + (id, globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK) + println("Approx heavy hitters at %2.2f%% threshold this batch: %s".format(PERC, + partialTopK.mkString("[", ",", "]"))) + println("Approx heavy hitters at %2.2f%% threshold overall: %s".format(PERC, + globalTopK.mkString("[", ",", "]"))) + } + }) + + exactTopUsers.foreach(rdd => { + if (rdd.count() != 0) { + val partialMap = rdd.collect().toMap + val partialTopK = rdd.map( + {case (id, count) => (count, id)}) + .sortByKey(ascending = false).take(TOPK) + globalExact = mm.plus(globalExact.toMap, partialMap) + val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0, TOPK) + println("Exact heavy hitters this batch: %s".format(partialTopK.mkString("[", ",", "]"))) + println("Exact heavy hitters overall: %s".format(globalTopK.mkString("[", ",", "]"))) + } + }) + + ssc.start() + } +} diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala new file mode 100644 index 0000000000..914fba4ca2 --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala @@ -0,0 +1,71 @@ +package spark.streaming.examples + +import spark.streaming.{Seconds, StreamingContext} +import spark.storage.StorageLevel +import com.twitter.algebird.HyperLogLog._ +import com.twitter.algebird.HyperLogLogMonoid +import spark.streaming.dstream.TwitterInputDStream + +/** + * Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute + * a windowed and global estimate of the unique user IDs occurring in a Twitter stream. + * <p> + * <p> + * This <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/"> + * blog post</a> and this + * <a href="http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html">blog post</a> + * have good overviews of HyperLogLog (HLL). HLL is a memory-efficient datastructure for estimating + * the cardinality of a data stream, i.e. the number of unique elements. + * <p><p> + * Algebird's implementation is a monoid, so we can succinctly merge two HLL instances in the reduce operation. + */ +object TwitterAlgebirdHLL { + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: TwitterAlgebirdHLL <master> <twitter_username> <twitter_password>" + + " [filter1] [filter2] ... [filter n]") + System.exit(1) + } + + /** Bit size parameter for HyperLogLog, trades off accuracy vs size */ + val BIT_SIZE = 12 + val Array(master, username, password) = args.slice(0, 3) + val filters = args.slice(3, args.length) + + val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5)) + val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER) + + val users = stream.map(status => status.getUser.getId) + + val hll = new HyperLogLogMonoid(BIT_SIZE) + var globalHll = hll.zero + var userSet: Set[Long] = Set() + + val approxUsers = users.mapPartitions(ids => { + ids.map(id => hll(id)) + }).reduce(_ + _) + + val exactUsers = users.map(id => Set(id)).reduce(_ ++ _) + + approxUsers.foreach(rdd => { + if (rdd.count() != 0) { + val partial = rdd.first() + globalHll += partial + println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt)) + println("Approx distinct users overall: %d".format(globalHll.estimatedSize.toInt)) + } + }) + + exactUsers.foreach(rdd => { + if (rdd.count() != 0) { + val partial = rdd.first() + userSet ++= partial + println("Exact distinct users this batch: %d".format(partial.size)) + println("Exact distinct users overall: %d".format(userSet.size)) + println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1) * 100)) + } + }) + + ssc.start() + } +} |