diff options
author | Nick Pentreath <nick.pentreath@gmail.com> | 2013-02-19 17:42:57 +0200 |
---|---|---|
committer | Nick Pentreath <nick.pentreath@gmail.com> | 2013-02-19 17:42:57 +0200 |
commit | d8ee184d950ac91ad4f617fc0d7b2f02ed6b74d3 (patch) | |
tree | 56957df9c7c9753d661fee987dcaed017326b145 /examples | |
parent | 315ea069e8aeb78dde23836827bd51462208aa7a (diff) | |
download | spark-d8ee184d950ac91ad4f617fc0d7b2f02ed6b74d3.tar.gz spark-d8ee184d950ac91ad4f617fc0d7b2f02ed6b74d3.tar.bz2 spark-d8ee184d950ac91ad4f617fc0d7b2f02ed6b74d3.zip |
Dependencies and refactoring for streaming HLL example, and using context.twitterStream method
Diffstat (limited to 'examples')
-rw-r--r-- | examples/pom.xml | 6 | ||||
-rw-r--r-- | examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala (renamed from examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala) | 29 |
2 files changed, 14 insertions, 21 deletions
diff --git a/examples/pom.xml b/examples/pom.xml index 28da3dbde4..7d975875fa 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -20,16 +20,10 @@ <artifactId>jetty-server</artifactId> </dependency> <dependency> - <groupId>org.twitter4j</groupId> - <artifactId>twitter4j-stream</artifactId> - <version>3.0.3</version> - </dependency> - <dependency> <groupId>com.twitter</groupId> <artifactId>algebird-core_2.9.2</artifactId> <version>0.1.8</version> </dependency> - <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.version}</artifactId> diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala index 023a0add80..c2095f5b94 100644 --- a/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala +++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala @@ -1,4 +1,4 @@ -package spark.streaming.examples.twitter +package spark.streaming.examples import spark.streaming.{Seconds, StreamingContext} import spark.storage.StorageLevel @@ -7,44 +7,43 @@ import com.twitter.algebird.HyperLogLogMonoid import spark.streaming.dstream.TwitterInputDStream /** - * Example of using HyperLogLog monoid from Twitter's Algebird together with Spark Streaming's - * TwitterInputDStream + * Example using HyperLogLog monoid from Twitter's Algebird together with Spark Streaming's + * TwitterInputDStream to compute approximate distinct counts of userids. */ -object StreamingHLL { +object TwitterAlgebirdHLL { def main(args: Array[String]) { if (args.length < 3) { - System.err.println("Usage: TwitterStreamingHLL <master> <twitter_username> <twitter_password>" + + System.err.println("Usage: TwitterAlgebirdHLL <master> <twitter_username> <twitter_password>" + " [filter1] [filter2] ... [filter n]") System.exit(1) } + /** Bit size parameter for HyperLogLog */ + val BIT_SIZE = 12 val Array(master, username, password) = args.slice(0, 3) val filters = args.slice(3, args.length) - val ssc = new StreamingContext(master, "TwitterStreamingHLL", Seconds(2)) - val stream = new TwitterInputDStream(ssc, username, password, filters, - StorageLevel.MEMORY_ONLY_SER) - ssc.registerInputStream(stream) + val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5)) + val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER) val users = stream.map(status => status.getUser.getId) - val globalHll = new HyperLogLogMonoid(12) + var globalHll = new HyperLogLogMonoid(BIT_SIZE).zero var userSet: Set[Long] = Set() val approxUsers = users.mapPartitions(ids => { - val hll = new HyperLogLogMonoid(12) + val hll = new HyperLogLogMonoid(BIT_SIZE) ids.map(id => hll(id)) }).reduce(_ + _) val exactUsers = users.map(id => Set(id)).reduce(_ ++ _) - var h = globalHll.zero approxUsers.foreach(rdd => { if (rdd.count() != 0) { val partial = rdd.first() - h += partial + globalHll += partial println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt)) - println("Approx distinct users overall: %d".format(globalHll.estimateSize(h).toInt)) + println("Approx distinct users overall: %d".format(globalHll.estimatedSize.toInt)) } }) @@ -54,7 +53,7 @@ object StreamingHLL { userSet ++= partial println("Exact distinct users this batch: %d".format(partial.size)) println("Exact distinct users overall: %d".format(userSet.size)) - println("Error rate: %2.5f%%".format(((globalHll.estimateSize(h) / userSet.size.toDouble) - 1) * 100)) + println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1) * 100)) } }) |