aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorNick Pentreath <nick.pentreath@gmail.com>2013-02-19 13:21:33 +0200
committerNick Pentreath <nick.pentreath@gmail.com>2013-02-19 13:21:33 +0200
commit015893f0e8983a7e249709d9820d1bf0dd74d607 (patch)
tree308d47eef1e9a246255160a4d190782100adf6a9 /examples
parentb53174a6f3acca1b2a1fb95eb9779f737c802f68 (diff)
downloadspark-015893f0e8983a7e249709d9820d1bf0dd74d607.tar.gz
spark-015893f0e8983a7e249709d9820d1bf0dd74d607.tar.bz2
spark-015893f0e8983a7e249709d9820d1bf0dd74d607.zip
Adding streaming HyperLogLog example using Algebird
Diffstat (limited to 'examples')
-rw-r--r--examples/pom.xml5
-rw-r--r--examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala62
2 files changed, 67 insertions, 0 deletions
diff --git a/examples/pom.xml b/examples/pom.xml
index f43af670c6..28da3dbde4 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -24,6 +24,11 @@
<artifactId>twitter4j-stream</artifactId>
<version>3.0.3</version>
</dependency>
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>algebird-core_2.9.2</artifactId>
+ <version>0.1.8</version>
+ </dependency>
<dependency>
<groupId>org.scalatest</groupId>
diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala b/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala
new file mode 100644
index 0000000000..f67bb029c6
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala
@@ -0,0 +1,62 @@
+package spark.streaming.examples.twitter
+
+import spark.streaming.{Seconds, StreamingContext}
+import spark.storage.StorageLevel
+import com.twitter.algebird.HyperLogLog._
+import com.twitter.algebird.HyperLogLogMonoid
+
+/**
+ * Example of using HyperLogLog monoid from Twitter's Algebird together with Spark Streaming's
+ * TwitterInputDStream
+ */
+object StreamingHLL {
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println("Usage: TwitterStreamingHLL <master> <twitter_username> <twitter_password>" +
+ " [filter1] [filter2] ... [filter n]")
+ System.exit(1)
+ }
+
+ val Array(master, username, password) = args.slice(0, 3)
+ val filters = args.slice(3, args.length)
+
+ val ssc = new StreamingContext(master, "TwitterStreamingHLL", Seconds(2))
+ val stream = new TwitterInputDStream(ssc, username, password, filters,
+ StorageLevel.MEMORY_ONLY_SER)
+ ssc.registerInputStream(stream)
+
+ val users = stream.map(status => status.getUser.getId)
+
+ val globalHll = new HyperLogLogMonoid(12)
+ var userSet: Set[Long] = Set()
+
+ val approxUsers = users.mapPartitions(ids => {
+ val hll = new HyperLogLogMonoid(12)
+ ids.map(id => hll(id))
+ }).reduce(_ + _)
+
+ val exactUsers = users.map(id => Set(id)).reduce(_ ++ _)
+
+ var h = globalHll.zero
+ approxUsers.foreach(rdd => {
+ if (rdd.count() != 0) {
+ val partial = rdd.first()
+ h += partial
+ println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt))
+ println("Approx distinct users overall: %d".format(globalHll.estimateSize(h).toInt))
+ }
+ })
+
+ exactUsers.foreach(rdd => {
+ if (rdd.count() != 0) {
+ val partial = rdd.first()
+ userSet ++= partial
+ println("Exact distinct users this batch: %d".format(partial.size))
+ println("Exact distinct users overall: %d".format(userSet.size))
+ println("Error rate: %2.5f%%".format(((globalHll.estimateSize(h) / userSet.size.toDouble) - 1) * 100))
+ }
+ })
+
+ ssc.start()
+ }
+}