aboutsummaryrefslogtreecommitdiff
path: root/examples/scala-2.10/src
diff options
context:
space:
mode:
authorAdam Pingel <adam@axle-lang.org>2014-11-17 10:47:29 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-11-17 10:47:46 -0800
commite0ab1c4766e1af384213a853588f6e69acd3b780 (patch)
treeae40a3e6e23bea261c2f2fad42fc49896b4df10b /examples/scala-2.10/src
parentd9d36a53dfeb51e4e070803e26187d436fd1f747 (diff)
downloadspark-e0ab1c4766e1af384213a853588f6e69acd3b780.tar.gz
spark-e0ab1c4766e1af384213a853588f6e69acd3b780.tar.bz2
spark-e0ab1c4766e1af384213a853588f6e69acd3b780.zip
SPARK-2811 upgrade algebird to 0.8.1
Author: Adam Pingel <adam@axle-lang.org> Closes #3282 from adampingel/master and squashes the following commits: 70c8d3c [Adam Pingel] relocate the algebird example back to example/src 7a9d8be [Adam Pingel] SPARK-2811 upgrade algebird to 0.8.1 (cherry picked from commit e7690ed20a2734b7ca88e78a60a8e75ba19e9d8b) Signed-off-by: Patrick Wendell <pwendell@gmail.com>
Diffstat (limited to 'examples/scala-2.10/src')
-rw-r--r--examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala114
-rw-r--r--examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala92
2 files changed, 0 insertions, 206 deletions
diff --git a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
deleted file mode 100644
index 683752ac96..0000000000
--- a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.streaming
-
-import com.twitter.algebird._
-
-import org.apache.spark.SparkConf
-import org.apache.spark.SparkContext._
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.streaming.twitter._
-
-// scalastyle:off
-/**
- * Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute
- * windowed and global Top-K estimates of user IDs occurring in a Twitter stream.
- * <br>
- * <strong>Note</strong> that since Algebird's implementation currently only supports Long inputs,
- * the example operates on Long IDs. Once the implementation supports other inputs (such as String),
- * the same approach could be used for computing popular topics for example.
- * <p>
- * <p>
- * <a href=
- * "http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
- * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a data
- * structure for approximate frequency estimation in data streams (e.g. Top-K elements, frequency
- * of any given element, etc), that uses space sub-linear in the number of elements in the
- * stream. Once elements are added to the CMS, the estimated count of an element can be computed,
- * as well as "heavy-hitters" that occur more than a threshold percentage of the overall total
- * count.
- * <p><p>
- * Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the
- * reduce operation.
- */
-// scalastyle:on
-object TwitterAlgebirdCMS {
- def main(args: Array[String]) {
- StreamingExamples.setStreamingLogLevels()
-
- // CMS parameters
- val DELTA = 1E-3
- val EPS = 0.01
- val SEED = 1
- val PERC = 0.001
- // K highest frequency elements to take
- val TOPK = 10
-
- val filters = args
- val sparkConf = new SparkConf().setAppName("TwitterAlgebirdCMS")
- val ssc = new StreamingContext(sparkConf, Seconds(10))
- val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER_2)
-
- val users = stream.map(status => status.getUser.getId)
-
- val cms = new CountMinSketchMonoid(EPS, DELTA, SEED, PERC)
- var globalCMS = cms.zero
- val mm = new MapMonoid[Long, Int]()
- var globalExact = Map[Long, Int]()
-
- val approxTopUsers = users.mapPartitions(ids => {
- ids.map(id => cms.create(id))
- }).reduce(_ ++ _)
-
- val exactTopUsers = users.map(id => (id, 1))
- .reduceByKey((a, b) => a + b)
-
- approxTopUsers.foreachRDD(rdd => {
- if (rdd.count() != 0) {
- val partial = rdd.first()
- val partialTopK = partial.heavyHitters.map(id =>
- (id, partial.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
- globalCMS ++= partial
- val globalTopK = globalCMS.heavyHitters.map(id =>
- (id, globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
- println("Approx heavy hitters at %2.2f%% threshold this batch: %s".format(PERC,
- partialTopK.mkString("[", ",", "]")))
- println("Approx heavy hitters at %2.2f%% threshold overall: %s".format(PERC,
- globalTopK.mkString("[", ",", "]")))
- }
- })
-
- exactTopUsers.foreachRDD(rdd => {
- if (rdd.count() != 0) {
- val partialMap = rdd.collect().toMap
- val partialTopK = rdd.map(
- {case (id, count) => (count, id)})
- .sortByKey(ascending = false).take(TOPK)
- globalExact = mm.plus(globalExact.toMap, partialMap)
- val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0, TOPK)
- println("Exact heavy hitters this batch: %s".format(partialTopK.mkString("[", ",", "]")))
- println("Exact heavy hitters overall: %s".format(globalTopK.mkString("[", ",", "]")))
- }
- })
-
- ssc.start()
- ssc.awaitTermination()
- }
-}
diff --git a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
deleted file mode 100644
index 62db5e663b..0000000000
--- a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.streaming
-
-import com.twitter.algebird.HyperLogLogMonoid
-import com.twitter.algebird.HyperLogLog._
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.twitter._
-import org.apache.spark.SparkConf
-
-// scalastyle:off
-/**
- * Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute
- * a windowed and global estimate of the unique user IDs occurring in a Twitter stream.
- * <p>
- * <p>
- * This <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
- * blog post</a> and this
- * <a href= "http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html">
- * blog post</a>
- * have good overviews of HyperLogLog (HLL). HLL is a memory-efficient datastructure for
- * estimating the cardinality of a data stream, i.e. the number of unique elements.
- * <p><p>
- * Algebird's implementation is a monoid, so we can succinctly merge two HLL instances in the
- * reduce operation.
- */
-// scalastyle:on
-object TwitterAlgebirdHLL {
- def main(args: Array[String]) {
-
- StreamingExamples.setStreamingLogLevels()
-
- /** Bit size parameter for HyperLogLog, trades off accuracy vs size */
- val BIT_SIZE = 12
- val filters = args
- val sparkConf = new SparkConf().setAppName("TwitterAlgebirdHLL")
- val ssc = new StreamingContext(sparkConf, Seconds(5))
- val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER)
-
- val users = stream.map(status => status.getUser.getId)
-
- val hll = new HyperLogLogMonoid(BIT_SIZE)
- var globalHll = hll.zero
- var userSet: Set[Long] = Set()
-
- val approxUsers = users.mapPartitions(ids => {
- ids.map(id => hll(id))
- }).reduce(_ + _)
-
- val exactUsers = users.map(id => Set(id)).reduce(_ ++ _)
-
- approxUsers.foreachRDD(rdd => {
- if (rdd.count() != 0) {
- val partial = rdd.first()
- globalHll += partial
- println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt))
- println("Approx distinct users overall: %d".format(globalHll.estimatedSize.toInt))
- }
- })
-
- exactUsers.foreachRDD(rdd => {
- if (rdd.count() != 0) {
- val partial = rdd.first()
- userSet ++= partial
- println("Exact distinct users this batch: %d".format(partial.size))
- println("Exact distinct users overall: %d".format(userSet.size))
- println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1
- ) * 100))
- }
- })
-
- ssc.start()
- ssc.awaitTermination()
- }
-}