aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--examples/pom.xml5
-rw-r--r--examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala62
-rw-r--r--project/SparkBuild.scala3
3 files changed, 69 insertions, 1 deletions
diff --git a/examples/pom.xml b/examples/pom.xml
index f43af670c6..28da3dbde4 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -24,6 +24,11 @@
<artifactId>twitter4j-stream</artifactId>
<version>3.0.3</version>
</dependency>
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>algebird-core_2.9.2</artifactId>
+ <version>0.1.8</version>
+ </dependency>
<dependency>
<groupId>org.scalatest</groupId>
diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala b/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala
new file mode 100644
index 0000000000..f67bb029c6
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala
@@ -0,0 +1,62 @@
+package spark.streaming.examples.twitter
+
+import spark.streaming.{Seconds, StreamingContext}
+import spark.storage.StorageLevel
+import com.twitter.algebird.HyperLogLog._
+import com.twitter.algebird.HyperLogLogMonoid
+
+/**
+ * Example of using HyperLogLog monoid from Twitter's Algebird together with Spark Streaming's
+ * TwitterInputDStream
+ */
+object StreamingHLL {
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println("Usage: TwitterStreamingHLL <master> <twitter_username> <twitter_password>" +
+ " [filter1] [filter2] ... [filter n]")
+ System.exit(1)
+ }
+
+ val Array(master, username, password) = args.slice(0, 3)
+ val filters = args.slice(3, args.length)
+
+ val ssc = new StreamingContext(master, "TwitterStreamingHLL", Seconds(2))
+ val stream = new TwitterInputDStream(ssc, username, password, filters,
+ StorageLevel.MEMORY_ONLY_SER)
+ ssc.registerInputStream(stream)
+
+ val users = stream.map(status => status.getUser.getId)
+
+ val globalHll = new HyperLogLogMonoid(12)
+ var userSet: Set[Long] = Set()
+
+ val approxUsers = users.mapPartitions(ids => {
+ val hll = new HyperLogLogMonoid(12)
+ ids.map(id => hll(id))
+ }).reduce(_ + _)
+
+ val exactUsers = users.map(id => Set(id)).reduce(_ ++ _)
+
+ var h = globalHll.zero
+ approxUsers.foreach(rdd => {
+ if (rdd.count() != 0) {
+ val partial = rdd.first()
+ h += partial
+ println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt))
+ println("Approx distinct users overall: %d".format(globalHll.estimateSize(h).toInt))
+ }
+ })
+
+ exactUsers.foreach(rdd => {
+ if (rdd.count() != 0) {
+ val partial = rdd.first()
+ userSet ++= partial
+ println("Exact distinct users this batch: %d".format(partial.size))
+ println("Exact distinct users overall: %d".format(userSet.size))
+ println("Error rate: %2.5f%%".format(((globalHll.estimateSize(h) / userSet.size.toDouble) - 1) * 100))
+ }
+ })
+
+ ssc.start()
+ }
+}
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index af8b5ba017..18cc9ea90e 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -156,7 +156,8 @@ object SparkBuild extends Build {
def examplesSettings = sharedSettings ++ Seq(
name := "spark-examples",
libraryDependencies ++= Seq(
- "org.twitter4j" % "twitter4j-stream" % "3.0.3"
+ "org.twitter4j" % "twitter4j-stream" % "3.0.3",
+ "com.twitter" % "algebird-core_2.9.2" % "0.1.8"
)
)