aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--examples/pom.xml6
-rw-r--r--examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala (renamed from examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala)29
-rw-r--r--streaming/pom.xml10
3 files changed, 24 insertions, 21 deletions
diff --git a/examples/pom.xml b/examples/pom.xml
index 28da3dbde4..7d975875fa 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -20,16 +20,10 @@
<artifactId>jetty-server</artifactId>
</dependency>
<dependency>
- <groupId>org.twitter4j</groupId>
- <artifactId>twitter4j-stream</artifactId>
- <version>3.0.3</version>
- </dependency>
- <dependency>
<groupId>com.twitter</groupId>
<artifactId>algebird-core_2.9.2</artifactId>
<version>0.1.8</version>
</dependency>
-
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.version}</artifactId>
diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala
index 023a0add80..c2095f5b94 100644
--- a/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala
+++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala
@@ -1,4 +1,4 @@
-package spark.streaming.examples.twitter
+package spark.streaming.examples
import spark.streaming.{Seconds, StreamingContext}
import spark.storage.StorageLevel
@@ -7,44 +7,43 @@ import com.twitter.algebird.HyperLogLogMonoid
import spark.streaming.dstream.TwitterInputDStream
/**
- * Example of using HyperLogLog monoid from Twitter's Algebird together with Spark Streaming's
- * TwitterInputDStream
+ * Example using HyperLogLog monoid from Twitter's Algebird together with Spark Streaming's
+ * TwitterInputDStream to compute approximate distinct counts of userids.
*/
-object StreamingHLL {
+object TwitterAlgebirdHLL {
def main(args: Array[String]) {
if (args.length < 3) {
- System.err.println("Usage: TwitterStreamingHLL <master> <twitter_username> <twitter_password>" +
+ System.err.println("Usage: TwitterAlgebirdHLL <master> <twitter_username> <twitter_password>" +
" [filter1] [filter2] ... [filter n]")
System.exit(1)
}
+ /** Bit size parameter for HyperLogLog */
+ val BIT_SIZE = 12
val Array(master, username, password) = args.slice(0, 3)
val filters = args.slice(3, args.length)
- val ssc = new StreamingContext(master, "TwitterStreamingHLL", Seconds(2))
- val stream = new TwitterInputDStream(ssc, username, password, filters,
- StorageLevel.MEMORY_ONLY_SER)
- ssc.registerInputStream(stream)
+ val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5))
+ val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER)
val users = stream.map(status => status.getUser.getId)
- val globalHll = new HyperLogLogMonoid(12)
+ var globalHll = new HyperLogLogMonoid(BIT_SIZE).zero
var userSet: Set[Long] = Set()
val approxUsers = users.mapPartitions(ids => {
- val hll = new HyperLogLogMonoid(12)
+ val hll = new HyperLogLogMonoid(BIT_SIZE)
ids.map(id => hll(id))
}).reduce(_ + _)
val exactUsers = users.map(id => Set(id)).reduce(_ ++ _)
- var h = globalHll.zero
approxUsers.foreach(rdd => {
if (rdd.count() != 0) {
val partial = rdd.first()
- h += partial
+ globalHll += partial
println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt))
- println("Approx distinct users overall: %d".format(globalHll.estimateSize(h).toInt))
+ println("Approx distinct users overall: %d".format(globalHll.estimatedSize.toInt))
}
})
@@ -54,7 +53,7 @@ object StreamingHLL {
userSet ++= partial
println("Exact distinct users this batch: %d".format(partial.size))
println("Exact distinct users overall: %d".format(userSet.size))
- println("Error rate: %2.5f%%".format(((globalHll.estimateSize(h) / userSet.size.toDouble) - 1) * 100))
+ println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1) * 100))
}
})
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 6ee7e59df3..d78c39da0d 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -47,6 +47,16 @@
<artifactId>zkclient</artifactId>
<version>0.1</version>
</dependency>
+ <dependency>
+ <groupId>org.twitter4j</groupId>
+ <artifactId>twitter4j-stream</artifactId>
+ <version>3.0.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.twitter4j</groupId>
+ <artifactId>twitter4j-core</artifactId>
+ <version>3.0.3</version>
+ </dependency>
<dependency>
<groupId>org.scalatest</groupId>