diff options
author | Nick Pentreath <nick.pentreath@gmail.com> | 2013-02-19 17:56:02 +0200 |
---|---|---|
committer | Nick Pentreath <nick.pentreath@gmail.com> | 2013-02-19 17:56:02 +0200 |
commit | 8a281399f970db761ea05baf07972fff1c5bd058 (patch) | |
tree | 1e28254e0089ba32e4a237ad6525e60889dff13a /examples/src | |
parent | d8ee184d950ac91ad4f617fc0d7b2f02ed6b74d3 (diff) | |
download | spark-8a281399f970db761ea05baf07972fff1c5bd058.tar.gz spark-8a281399f970db761ea05baf07972fff1c5bd058.tar.bz2 spark-8a281399f970db761ea05baf07972fff1c5bd058.zip |
Streaming example using Twitter Algebird's Count Min Sketch monoid
Diffstat (limited to 'examples/src')
-rw-r--r-- | examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala | 78 |
1 files changed, 78 insertions, 0 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala new file mode 100644 index 0000000000..d50e86aff5 --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala @@ -0,0 +1,78 @@ +package spark.streaming.examples + +import spark.streaming.{Seconds, StreamingContext} +import spark.storage.StorageLevel +import com.twitter.algebird._ +import spark.streaming.StreamingContext._ +import spark.SparkContext._ + +/** + * Example of using CountMinSketch monoid from Twitter's Algebird together with Spark Streaming's + * TwitterInputDStream + */ +object TwitterAlgebirdCMS { + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: TwitterAlgebirdCMS <master> <twitter_username> <twitter_password>" + + " [filter1] [filter2] ... [filter n]") + System.exit(1) + } + + val DELTA = 1E-3 + val EPS = 0.01 + val SEED = 1 + val PERC = 0.001 + val TOPK = 10 + + val Array(master, username, password) = args.slice(0, 3) + val filters = args.slice(3, args.length) + + val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10)) + val stream = ssc.twitterStream(username, password, filters, + StorageLevel.MEMORY_ONLY_SER) + + val users = stream.map(status => status.getUser.getId) + + var globalCMS = new CountMinSketchMonoid(DELTA, EPS, SEED, PERC).zero + var globalExact = Map[Long, Int]() + val mm = new MapMonoid[Long, Int]() + + val approxTopUsers = users.mapPartitions(ids => { + val cms = new CountMinSketchMonoid(DELTA, EPS, SEED, PERC) + ids.map(id => cms.create(id)) + }).reduce(_ ++ _) + + val exactTopUsers = users.map(id => (id, 1)) + .reduceByKey((a, b) => a + b) + + approxTopUsers.foreach(rdd => { + if (rdd.count() != 0) { + val partial = rdd.first() + val partialTopK = partial.heavyHitters.map(id => + (id, partial.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK) + globalCMS ++= partial + val globalTopK = globalCMS.heavyHitters.map(id => + (id, globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK) + println("Approx heavy hitters at %2.2f%% threshold this batch: %s".format(PERC, + partialTopK.mkString("[", ",", "]"))) + println("Approx heavy hitters at %2.2f%% threshold overall: %s".format(PERC, + globalTopK.mkString("[", ",", "]"))) + } + }) + + exactTopUsers.foreach(rdd => { + if (rdd.count() != 0) { + val partialMap = rdd.collect().toMap + val partialTopK = rdd.map( + {case (id, count) => (count, id)}) + .sortByKey(ascending = false).take(TOPK) + globalExact = mm.plus(globalExact.toMap, partialMap) + val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0, TOPK) + println("Exact heavy hitters this batch: %s".format(partialTopK.mkString("[", ",", "]"))) + println("Exact heavy hitters overall: %s".format(globalTopK.mkString("[", ",", "]"))) + } + }) + + ssc.start() + } +} |