From 015893f0e8983a7e249709d9820d1bf0dd74d607 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Tue, 19 Feb 2013 13:21:33 +0200 Subject: Adding streaming HyperLogLog example using Algebird --- examples/pom.xml | 5 ++ .../streaming/examples/twitter/StreamingHLL.scala | 62 ++++++++++++++++++++++ 2 files changed, 67 insertions(+) create mode 100644 examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala (limited to 'examples') diff --git a/examples/pom.xml b/examples/pom.xml index f43af670c6..28da3dbde4 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -24,6 +24,11 @@ twitter4j-stream 3.0.3 + + com.twitter + algebird-core_2.9.2 + 0.1.8 + org.scalatest diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala b/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala new file mode 100644 index 0000000000..f67bb029c6 --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala @@ -0,0 +1,62 @@ +package spark.streaming.examples.twitter + +import spark.streaming.{Seconds, StreamingContext} +import spark.storage.StorageLevel +import com.twitter.algebird.HyperLogLog._ +import com.twitter.algebird.HyperLogLogMonoid + +/** + * Example of using HyperLogLog monoid from Twitter's Algebird together with Spark Streaming's + * TwitterInputDStream + */ +object StreamingHLL { + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: TwitterStreamingHLL " + + " [filter1] [filter2] ... [filter n]") + System.exit(1) + } + + val Array(master, username, password) = args.slice(0, 3) + val filters = args.slice(3, args.length) + + val ssc = new StreamingContext(master, "TwitterStreamingHLL", Seconds(2)) + val stream = new TwitterInputDStream(ssc, username, password, filters, + StorageLevel.MEMORY_ONLY_SER) + ssc.registerInputStream(stream) + + val users = stream.map(status => status.getUser.getId) + + val globalHll = new HyperLogLogMonoid(12) + var userSet: Set[Long] = Set() + + val approxUsers = users.mapPartitions(ids => { + val hll = new HyperLogLogMonoid(12) + ids.map(id => hll(id)) + }).reduce(_ + _) + + val exactUsers = users.map(id => Set(id)).reduce(_ ++ _) + + var h = globalHll.zero + approxUsers.foreach(rdd => { + if (rdd.count() != 0) { + val partial = rdd.first() + h += partial + println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt)) + println("Approx distinct users overall: %d".format(globalHll.estimateSize(h).toInt)) + } + }) + + exactUsers.foreach(rdd => { + if (rdd.count() != 0) { + val partial = rdd.first() + userSet ++= partial + println("Exact distinct users this batch: %d".format(partial.size)) + println("Exact distinct users overall: %d".format(userSet.size)) + println("Error rate: %2.5f%%".format(((globalHll.estimateSize(h) / userSet.size.toDouble) - 1) * 100)) + } + }) + + ssc.start() + } +} -- cgit v1.2.3