diff options
author | Nick Pentreath <nick.pentreath@gmail.com> | 2013-02-19 13:21:33 +0200 |
---|---|---|
committer | Nick Pentreath <nick.pentreath@gmail.com> | 2013-02-19 13:21:33 +0200 |
commit | 015893f0e8983a7e249709d9820d1bf0dd74d607 (patch) | |
tree | 308d47eef1e9a246255160a4d190782100adf6a9 /examples | |
parent | b53174a6f3acca1b2a1fb95eb9779f737c802f68 (diff) | |
download | spark-015893f0e8983a7e249709d9820d1bf0dd74d607.tar.gz spark-015893f0e8983a7e249709d9820d1bf0dd74d607.tar.bz2 spark-015893f0e8983a7e249709d9820d1bf0dd74d607.zip |
Adding streaming HyperLogLog example using Algebird
Diffstat (limited to 'examples')
-rw-r--r-- | examples/pom.xml | 5 | ||||
-rw-r--r-- | examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala | 62 |
2 files changed, 67 insertions, 0 deletions
diff --git a/examples/pom.xml b/examples/pom.xml index f43af670c6..28da3dbde4 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -24,6 +24,11 @@ <artifactId>twitter4j-stream</artifactId> <version>3.0.3</version> </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>algebird-core_2.9.2</artifactId> + <version>0.1.8</version> + </dependency> <dependency> <groupId>org.scalatest</groupId> diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala b/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala new file mode 100644 index 0000000000..f67bb029c6 --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala @@ -0,0 +1,62 @@ +package spark.streaming.examples.twitter + +import spark.streaming.{Seconds, StreamingContext} +import spark.storage.StorageLevel +import com.twitter.algebird.HyperLogLog._ +import com.twitter.algebird.HyperLogLogMonoid + +/** + * Example of using HyperLogLog monoid from Twitter's Algebird together with Spark Streaming's + * TwitterInputDStream + */ +object StreamingHLL { + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: TwitterStreamingHLL <master> <twitter_username> <twitter_password>" + + " [filter1] [filter2] ... [filter n]") + System.exit(1) + } + + val Array(master, username, password) = args.slice(0, 3) + val filters = args.slice(3, args.length) + + val ssc = new StreamingContext(master, "TwitterStreamingHLL", Seconds(2)) + val stream = new TwitterInputDStream(ssc, username, password, filters, + StorageLevel.MEMORY_ONLY_SER) + ssc.registerInputStream(stream) + + val users = stream.map(status => status.getUser.getId) + + val globalHll = new HyperLogLogMonoid(12) + var userSet: Set[Long] = Set() + + val approxUsers = users.mapPartitions(ids => { + val hll = new HyperLogLogMonoid(12) + ids.map(id => hll(id)) + }).reduce(_ + _) + + val exactUsers = users.map(id => Set(id)).reduce(_ ++ _) + + var h = globalHll.zero + approxUsers.foreach(rdd => { + if (rdd.count() != 0) { + val partial = rdd.first() + h += partial + println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt)) + println("Approx distinct users overall: %d".format(globalHll.estimateSize(h).toInt)) + } + }) + + exactUsers.foreach(rdd => { + if (rdd.count() != 0) { + val partial = rdd.first() + userSet ++= partial + println("Exact distinct users this batch: %d".format(partial.size)) + println("Exact distinct users overall: %d".format(userSet.size)) + println("Error rate: %2.5f%%".format(((globalHll.estimateSize(h) / userSet.size.toDouble) - 1) * 100)) + } + }) + + ssc.start() + } +} |