diff options
Diffstat (limited to 'examples/scala-2.10')
4 files changed, 421 insertions, 0 deletions
diff --git a/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java b/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java new file mode 100644 index 0000000000..16ae9a3319 --- /dev/null +++ b/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming; + +import java.util.Map; +import java.util.HashMap; +import java.util.regex.Pattern; + + +import scala.Tuple2; + +import com.google.common.collect.Lists; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.examples.streaming.StreamingExamples; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.kafka.KafkaUtils; + +/** + * Consumes messages from one or more topics in Kafka and does wordcount. + * + * Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads> + * <zkQuorum> is a list of one or more zookeeper servers that make quorum + * <group> is the name of kafka consumer group + * <topics> is a list of one or more kafka topics to consume from + * <numThreads> is the number of threads the kafka consumer should use + * + * To run this example: + * `$ bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount zoo01,zoo02, \ + * zoo03 my-consumer-group topic1,topic2 1` + */ + +public final class JavaKafkaWordCount { + private static final Pattern SPACE = Pattern.compile(" "); + + private JavaKafkaWordCount() { + } + + public static void main(String[] args) { + if (args.length < 4) { + System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>"); + System.exit(1); + } + + StreamingExamples.setStreamingLogLevels(); + SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount"); + // Create the context with a 1 second batch size + JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); + + int numThreads = Integer.parseInt(args[3]); + Map<String, Integer> topicMap = new HashMap<String, Integer>(); + String[] topics = args[2].split(","); + for (String topic: topics) { + topicMap.put(topic, numThreads); + } + + JavaPairReceiverInputDStream<String, String> messages = + KafkaUtils.createStream(jssc, args[0], args[1], topicMap); + + JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { + @Override + public String call(Tuple2<String, String> tuple2) { + return tuple2._2(); + } + }); + + JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { + @Override + public Iterable<String> call(String x) { + return Lists.newArrayList(SPACE.split(x)); + } + }); + + JavaPairDStream<String, Integer> wordCounts = words.mapToPair( + new PairFunction<String, String, Integer>() { + @Override + public Tuple2<String, Integer> call(String s) { + return new Tuple2<String, Integer>(s, 1); + } + }).reduceByKey(new Function2<Integer, Integer, Integer>() { + @Override + public Integer call(Integer i1, Integer i2) { + return i1 + i2; + } + }); + + wordCounts.print(); + jssc.start(); + jssc.awaitTermination(); + } +} diff --git a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala new file mode 100644 index 0000000000..c9e1511278 --- /dev/null +++ b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import java.util.Properties + +import kafka.producer._ + +import org.apache.spark.streaming._ +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.kafka._ +import org.apache.spark.SparkConf + +/** + * Consumes messages from one or more topics in Kafka and does wordcount. + * Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads> + * <zkQuorum> is a list of one or more zookeeper servers that make quorum + * <group> is the name of kafka consumer group + * <topics> is a list of one or more kafka topics to consume from + * <numThreads> is the number of threads the kafka consumer should use + * + * Example: + * `$ bin/run-example \ + * org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 \ + * my-consumer-group topic1,topic2 1` + */ +object KafkaWordCount { + def main(args: Array[String]) { + if (args.length < 4) { + System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>") + System.exit(1) + } + + StreamingExamples.setStreamingLogLevels() + + val Array(zkQuorum, group, topics, numThreads) = args + val sparkConf = new SparkConf().setAppName("KafkaWordCount") + val ssc = new StreamingContext(sparkConf, Seconds(2)) + ssc.checkpoint("checkpoint") + + val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap + val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1L)) + .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2) + wordCounts.print() + + ssc.start() + ssc.awaitTermination() + } +} + +// Produces some random words between 1 and 100. +object KafkaWordCountProducer { + + def main(args: Array[String]) { + if (args.length < 4) { + System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " + + "<messagesPerSec> <wordsPerMessage>") + System.exit(1) + } + + val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args + + // Zookeper connection properties + val props = new Properties() + props.put("metadata.broker.list", brokers) + props.put("serializer.class", "kafka.serializer.StringEncoder") + + val config = new ProducerConfig(props) + val producer = new Producer[String, String](config) + + // Send some messages + while(true) { + val messages = (1 to messagesPerSec.toInt).map { messageNum => + val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString) + .mkString(" ") + + new KeyedMessage[String, String](topic, str) + }.toArray + + producer.send(messages: _*) + Thread.sleep(100) + } + } + +} diff --git a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala new file mode 100644 index 0000000000..683752ac96 --- /dev/null +++ b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import com.twitter.algebird._ + +import org.apache.spark.SparkConf +import org.apache.spark.SparkContext._ +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.twitter._ + +// scalastyle:off +/** + * Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute + * windowed and global Top-K estimates of user IDs occurring in a Twitter stream. + * <br> + * <strong>Note</strong> that since Algebird's implementation currently only supports Long inputs, + * the example operates on Long IDs. Once the implementation supports other inputs (such as String), + * the same approach could be used for computing popular topics for example. + * <p> + * <p> + * <a href= + * "http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/"> + * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a data + * structure for approximate frequency estimation in data streams (e.g. Top-K elements, frequency + * of any given element, etc), that uses space sub-linear in the number of elements in the + * stream. Once elements are added to the CMS, the estimated count of an element can be computed, + * as well as "heavy-hitters" that occur more than a threshold percentage of the overall total + * count. + * <p><p> + * Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the + * reduce operation. + */ +// scalastyle:on +object TwitterAlgebirdCMS { + def main(args: Array[String]) { + StreamingExamples.setStreamingLogLevels() + + // CMS parameters + val DELTA = 1E-3 + val EPS = 0.01 + val SEED = 1 + val PERC = 0.001 + // K highest frequency elements to take + val TOPK = 10 + + val filters = args + val sparkConf = new SparkConf().setAppName("TwitterAlgebirdCMS") + val ssc = new StreamingContext(sparkConf, Seconds(10)) + val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER_2) + + val users = stream.map(status => status.getUser.getId) + + val cms = new CountMinSketchMonoid(EPS, DELTA, SEED, PERC) + var globalCMS = cms.zero + val mm = new MapMonoid[Long, Int]() + var globalExact = Map[Long, Int]() + + val approxTopUsers = users.mapPartitions(ids => { + ids.map(id => cms.create(id)) + }).reduce(_ ++ _) + + val exactTopUsers = users.map(id => (id, 1)) + .reduceByKey((a, b) => a + b) + + approxTopUsers.foreachRDD(rdd => { + if (rdd.count() != 0) { + val partial = rdd.first() + val partialTopK = partial.heavyHitters.map(id => + (id, partial.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK) + globalCMS ++= partial + val globalTopK = globalCMS.heavyHitters.map(id => + (id, globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK) + println("Approx heavy hitters at %2.2f%% threshold this batch: %s".format(PERC, + partialTopK.mkString("[", ",", "]"))) + println("Approx heavy hitters at %2.2f%% threshold overall: %s".format(PERC, + globalTopK.mkString("[", ",", "]"))) + } + }) + + exactTopUsers.foreachRDD(rdd => { + if (rdd.count() != 0) { + val partialMap = rdd.collect().toMap + val partialTopK = rdd.map( + {case (id, count) => (count, id)}) + .sortByKey(ascending = false).take(TOPK) + globalExact = mm.plus(globalExact.toMap, partialMap) + val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0, TOPK) + println("Exact heavy hitters this batch: %s".format(partialTopK.mkString("[", ",", "]"))) + println("Exact heavy hitters overall: %s".format(globalTopK.mkString("[", ",", "]"))) + } + }) + + ssc.start() + ssc.awaitTermination() + } +} diff --git a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala new file mode 100644 index 0000000000..62db5e663b --- /dev/null +++ b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import com.twitter.algebird.HyperLogLogMonoid +import com.twitter.algebird.HyperLogLog._ + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.twitter._ +import org.apache.spark.SparkConf + +// scalastyle:off +/** + * Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute + * a windowed and global estimate of the unique user IDs occurring in a Twitter stream. + * <p> + * <p> + * This <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/"> + * blog post</a> and this + * <a href= "http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html"> + * blog post</a> + * have good overviews of HyperLogLog (HLL). HLL is a memory-efficient datastructure for + * estimating the cardinality of a data stream, i.e. the number of unique elements. + * <p><p> + * Algebird's implementation is a monoid, so we can succinctly merge two HLL instances in the + * reduce operation. + */ +// scalastyle:on +object TwitterAlgebirdHLL { + def main(args: Array[String]) { + + StreamingExamples.setStreamingLogLevels() + + /** Bit size parameter for HyperLogLog, trades off accuracy vs size */ + val BIT_SIZE = 12 + val filters = args + val sparkConf = new SparkConf().setAppName("TwitterAlgebirdHLL") + val ssc = new StreamingContext(sparkConf, Seconds(5)) + val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER) + + val users = stream.map(status => status.getUser.getId) + + val hll = new HyperLogLogMonoid(BIT_SIZE) + var globalHll = hll.zero + var userSet: Set[Long] = Set() + + val approxUsers = users.mapPartitions(ids => { + ids.map(id => hll(id)) + }).reduce(_ + _) + + val exactUsers = users.map(id => Set(id)).reduce(_ ++ _) + + approxUsers.foreachRDD(rdd => { + if (rdd.count() != 0) { + val partial = rdd.first() + globalHll += partial + println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt)) + println("Approx distinct users overall: %d".format(globalHll.estimatedSize.toInt)) + } + }) + + exactUsers.foreachRDD(rdd => { + if (rdd.count() != 0) { + val partial = rdd.first() + userSet ++= partial + println("Exact distinct users this batch: %d".format(partial.size)) + println("Exact distinct users overall: %d".format(userSet.size)) + println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1 + ) * 100)) + } + }) + + ssc.start() + ssc.awaitTermination() + } +} |