diff options
-rw-r--r-- | examples/pom.xml | 7 | ||||
-rw-r--r-- | examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala | 78 | ||||
-rw-r--r-- | examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala | 62 | ||||
-rw-r--r-- | project/SparkBuild.scala | 3 | ||||
-rw-r--r-- | streaming/pom.xml | 10 |
5 files changed, 155 insertions, 5 deletions
diff --git a/examples/pom.xml b/examples/pom.xml index f43af670c6..7d975875fa 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -20,11 +20,10 @@ <artifactId>jetty-server</artifactId> </dependency> <dependency> - <groupId>org.twitter4j</groupId> - <artifactId>twitter4j-stream</artifactId> - <version>3.0.3</version> + <groupId>com.twitter</groupId> + <artifactId>algebird-core_2.9.2</artifactId> + <version>0.1.8</version> </dependency> - <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.version}</artifactId> diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala new file mode 100644 index 0000000000..d50e86aff5 --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala @@ -0,0 +1,78 @@ +package spark.streaming.examples + +import spark.streaming.{Seconds, StreamingContext} +import spark.storage.StorageLevel +import com.twitter.algebird._ +import spark.streaming.StreamingContext._ +import spark.SparkContext._ + +/** + * Example of using CountMinSketch monoid from Twitter's Algebird together with Spark Streaming's + * TwitterInputDStream + */ +object TwitterAlgebirdCMS { + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: TwitterAlgebirdCMS <master> <twitter_username> <twitter_password>" + + " [filter1] [filter2] ... [filter n]") + System.exit(1) + } + + val DELTA = 1E-3 + val EPS = 0.01 + val SEED = 1 + val PERC = 0.001 + val TOPK = 10 + + val Array(master, username, password) = args.slice(0, 3) + val filters = args.slice(3, args.length) + + val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10)) + val stream = ssc.twitterStream(username, password, filters, + StorageLevel.MEMORY_ONLY_SER) + + val users = stream.map(status => status.getUser.getId) + + var globalCMS = new CountMinSketchMonoid(DELTA, EPS, SEED, PERC).zero + var globalExact = Map[Long, Int]() + val mm = new MapMonoid[Long, Int]() + + val approxTopUsers = users.mapPartitions(ids => { + val cms = new CountMinSketchMonoid(DELTA, EPS, SEED, PERC) + ids.map(id => cms.create(id)) + }).reduce(_ ++ _) + + val exactTopUsers = users.map(id => (id, 1)) + .reduceByKey((a, b) => a + b) + + approxTopUsers.foreach(rdd => { + if (rdd.count() != 0) { + val partial = rdd.first() + val partialTopK = partial.heavyHitters.map(id => + (id, partial.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK) + globalCMS ++= partial + val globalTopK = globalCMS.heavyHitters.map(id => + (id, globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK) + println("Approx heavy hitters at %2.2f%% threshold this batch: %s".format(PERC, + partialTopK.mkString("[", ",", "]"))) + println("Approx heavy hitters at %2.2f%% threshold overall: %s".format(PERC, + globalTopK.mkString("[", ",", "]"))) + } + }) + + exactTopUsers.foreach(rdd => { + if (rdd.count() != 0) { + val partialMap = rdd.collect().toMap + val partialTopK = rdd.map( + {case (id, count) => (count, id)}) + .sortByKey(ascending = false).take(TOPK) + globalExact = mm.plus(globalExact.toMap, partialMap) + val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0, TOPK) + println("Exact heavy hitters this batch: %s".format(partialTopK.mkString("[", ",", "]"))) + println("Exact heavy hitters overall: %s".format(globalTopK.mkString("[", ",", "]"))) + } + }) + + ssc.start() + } +} diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala new file mode 100644 index 0000000000..c2095f5b94 --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala @@ -0,0 +1,62 @@ +package spark.streaming.examples + +import spark.streaming.{Seconds, StreamingContext} +import spark.storage.StorageLevel +import com.twitter.algebird.HyperLogLog._ +import com.twitter.algebird.HyperLogLogMonoid +import spark.streaming.dstream.TwitterInputDStream + +/** + * Example using HyperLogLog monoid from Twitter's Algebird together with Spark Streaming's + * TwitterInputDStream to compute approximate distinct counts of userids. + */ +object TwitterAlgebirdHLL { + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: TwitterAlgebirdHLL <master> <twitter_username> <twitter_password>" + + " [filter1] [filter2] ... [filter n]") + System.exit(1) + } + + /** Bit size parameter for HyperLogLog */ + val BIT_SIZE = 12 + val Array(master, username, password) = args.slice(0, 3) + val filters = args.slice(3, args.length) + + val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5)) + val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER) + + val users = stream.map(status => status.getUser.getId) + + var globalHll = new HyperLogLogMonoid(BIT_SIZE).zero + var userSet: Set[Long] = Set() + + val approxUsers = users.mapPartitions(ids => { + val hll = new HyperLogLogMonoid(BIT_SIZE) + ids.map(id => hll(id)) + }).reduce(_ + _) + + val exactUsers = users.map(id => Set(id)).reduce(_ ++ _) + + approxUsers.foreach(rdd => { + if (rdd.count() != 0) { + val partial = rdd.first() + globalHll += partial + println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt)) + println("Approx distinct users overall: %d".format(globalHll.estimatedSize.toInt)) + } + }) + + exactUsers.foreach(rdd => { + if (rdd.count() != 0) { + val partial = rdd.first() + userSet ++= partial + println("Exact distinct users this batch: %d".format(partial.size)) + println("Exact distinct users overall: %d".format(userSet.size)) + println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1) * 100)) + } + }) + + ssc.start() + } +} diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index c6d3cc8b15..090fd65bcb 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -154,7 +154,8 @@ object SparkBuild extends Build { ) def examplesSettings = sharedSettings ++ Seq( - name := "spark-examples" + name := "spark-examples", + libraryDependencies ++= Seq("com.twitter" % "algebird-core_2.9.2" % "0.1.8") ) def bagelSettings = sharedSettings ++ Seq(name := "spark-bagel") diff --git a/streaming/pom.xml b/streaming/pom.xml index 6ee7e59df3..d78c39da0d 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -47,6 +47,16 @@ <artifactId>zkclient</artifactId> <version>0.1</version> </dependency> + <dependency> + <groupId>org.twitter4j</groupId> + <artifactId>twitter4j-stream</artifactId> + <version>3.0.3</version> + </dependency> + <dependency> + <groupId>org.twitter4j</groupId> + <artifactId>twitter4j-core</artifactId> + <version>3.0.3</version> + </dependency> <dependency> <groupId>org.scalatest</groupId> |