diff options
author | Jeff L <sha0lin@alumni.carnegiemellon.edu> | 2015-12-18 15:06:54 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2015-12-18 15:06:54 +0000 |
commit | ea59b0f3a6600f8046e5f3f55e89257614fb1f10 (patch) | |
tree | f2a7a4df2c5ece58253b98a0a60b598730f91531 /examples | |
parent | 2bebaa39d9da33bc93ef682959cd42c1968a6a3e (diff) | |
download | spark-ea59b0f3a6600f8046e5f3f55e89257614fb1f10.tar.gz spark-ea59b0f3a6600f8046e5f3f55e89257614fb1f10.tar.bz2 spark-ea59b0f3a6600f8046e5f3f55e89257614fb1f10.zip |
[SPARK-9057][STREAMING] Twitter example joining to static RDD of word sentiment values
Example of joining a static RDD of word sentiments to a streaming RDD of Tweets in order to demo the usage of the transform() method.
Author: Jeff L <sha0lin@alumni.carnegiemellon.edu>
Closes #8431 from Agent007/SPARK-9057.
Diffstat (limited to 'examples')
3 files changed, 353 insertions, 0 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java new file mode 100644 index 0000000000..030ee30b93 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.twitter.TwitterUtils; +import scala.Tuple2; +import twitter4j.Status; + +import java.io.IOException; +import java.net.URI; +import java.util.Arrays; +import java.util.List; + +/** + * Displays the most positive hash tags by joining the streaming Twitter data with a static RDD of + * the AFINN word list (http://neuro.imm.dtu.dk/wiki/AFINN) + */ +public class JavaTwitterHashTagJoinSentiments { + + public static void main(String[] args) throws IOException { + if (args.length < 4) { + System.err.println("Usage: JavaTwitterHashTagJoinSentiments <consumer key> <consumer secret>" + + " <access token> <access token secret> [<filters>]"); + System.exit(1); + } + + StreamingExamples.setStreamingLogLevels(); + + String consumerKey = args[0]; + String consumerSecret = args[1]; + String accessToken = args[2]; + String accessTokenSecret = args[3]; + String[] filters = Arrays.copyOfRange(args, 4, args.length); + + // Set the system properties so that Twitter4j library used by Twitter stream + // can use them to generate OAuth credentials + System.setProperty("twitter4j.oauth.consumerKey", consumerKey); + System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret); + System.setProperty("twitter4j.oauth.accessToken", accessToken); + System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret); + + SparkConf sparkConf = new SparkConf().setAppName("JavaTwitterHashTagJoinSentiments"); + JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); + JavaReceiverInputDStream<Status> stream = TwitterUtils.createStream(jssc, filters); + + JavaDStream<String> words = stream.flatMap(new FlatMapFunction<Status, String>() { + @Override + public Iterable<String> call(Status s) { + return Arrays.asList(s.getText().split(" ")); + } + }); + + JavaDStream<String> hashTags = words.filter(new Function<String, Boolean>() { + @Override + public Boolean call(String word) throws Exception { + return word.startsWith("#"); + } + }); + + // Read in the word-sentiment list and create a static RDD from it + String wordSentimentFilePath = "data/streaming/AFINN-111.txt"; + final JavaPairRDD<String, Double> wordSentiments = jssc.sparkContext().textFile(wordSentimentFilePath) + .mapToPair(new PairFunction<String, String, Double>(){ + @Override + public Tuple2<String, Double> call(String line) { + String[] columns = line.split("\t"); + return new Tuple2<String, Double>(columns[0], + Double.parseDouble(columns[1])); + } + }); + + JavaPairDStream<String, Integer> hashTagCount = hashTags.mapToPair( + new PairFunction<String, String, Integer>() { + @Override + public Tuple2<String, Integer> call(String s) { + // leave out the # character + return new Tuple2<String, Integer>(s.substring(1), 1); + } + }); + + JavaPairDStream<String, Integer> hashTagTotals = hashTagCount.reduceByKeyAndWindow( + new Function2<Integer, Integer, Integer>() { + @Override + public Integer call(Integer a, Integer b) { + return a + b; + } + }, new Duration(10000)); + + // Determine the hash tags with the highest sentiment values by joining the streaming RDD + // with the static RDD inside the transform() method and then multiplying + // the frequency of the hash tag by its sentiment value + JavaPairDStream<String, Tuple2<Double, Integer>> joinedTuples = + hashTagTotals.transformToPair(new Function<JavaPairRDD<String, Integer>, + JavaPairRDD<String, Tuple2<Double, Integer>>>() { + @Override + public JavaPairRDD<String, Tuple2<Double, Integer>> call(JavaPairRDD<String, + Integer> topicCount) + throws Exception { + return wordSentiments.join(topicCount); + } + }); + + JavaPairDStream<String, Double> topicHappiness = joinedTuples.mapToPair( + new PairFunction<Tuple2<String, Tuple2<Double, Integer>>, String, Double>() { + @Override + public Tuple2<String, Double> call(Tuple2<String, + Tuple2<Double, Integer>> topicAndTuplePair) throws Exception { + Tuple2<Double, Integer> happinessAndCount = topicAndTuplePair._2(); + return new Tuple2<String, Double>(topicAndTuplePair._1(), + happinessAndCount._1() * happinessAndCount._2()); + } + }); + + JavaPairDStream<Double, String> happinessTopicPairs = topicHappiness.mapToPair( + new PairFunction<Tuple2<String, Double>, Double, String>() { + @Override + public Tuple2<Double, String> call(Tuple2<String, Double> topicHappiness) + throws Exception { + return new Tuple2<Double, String>(topicHappiness._2(), + topicHappiness._1()); + } + }); + + JavaPairDStream<Double, String> happiest10 = happinessTopicPairs.transformToPair( + new Function<JavaPairRDD<Double, String>, JavaPairRDD<Double, String>>() { + @Override + public JavaPairRDD<Double, String> call(JavaPairRDD<Double, + String> happinessAndTopics) throws Exception { + return happinessAndTopics.sortByKey(false); + } + } + ); + + // Print hash tags with the most positive sentiment values + happiest10.foreachRDD(new Function<JavaPairRDD<Double, String>, Void>() { + @Override + public Void call(JavaPairRDD<Double, String> happinessTopicPairs) throws Exception { + List<Tuple2<Double, String>> topList = happinessTopicPairs.take(10); + System.out.println( + String.format("\nHappiest topics in last 10 seconds (%s total):", + happinessTopicPairs.count())); + for (Tuple2<Double, String> pair : topList) { + System.out.println( + String.format("%s (%s happiness)", pair._2(), pair._1())); + } + return null; + } + }); + + jssc.start(); + jssc.awaitTermination(); + } +} diff --git a/examples/src/main/python/streaming/network_wordjoinsentiments.py b/examples/src/main/python/streaming/network_wordjoinsentiments.py new file mode 100644 index 0000000000..b85517dfdd --- /dev/null +++ b/examples/src/main/python/streaming/network_wordjoinsentiments.py @@ -0,0 +1,77 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" + Shows the most positive words in UTF8 encoded, '\n' delimited text directly received the network + every 5 seconds. The streaming data is joined with a static RDD of the AFINN word list + (http://neuro.imm.dtu.dk/wiki/AFINN) + + Usage: network_wordjoinsentiments.py <hostname> <port> + <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data. + + To run this on your local machine, you need to first run a Netcat server + `$ nc -lk 9999` + and then run the example + `$ bin/spark-submit examples/src/main/python/streaming/network_wordjoinsentiments.py \ + localhost 9999` +""" + +from __future__ import print_function + +import sys + +from pyspark import SparkContext +from pyspark.streaming import StreamingContext + + +def print_happiest_words(rdd): + top_list = rdd.take(5) + print("Happiest topics in the last 5 seconds (%d total):" % rdd.count()) + for tuple in top_list: + print("%s (%d happiness)" % (tuple[1], tuple[0])) + +if __name__ == "__main__": + if len(sys.argv) != 3: + print("Usage: network_wordjoinsentiments.py <hostname> <port>", file=sys.stderr) + exit(-1) + + sc = SparkContext(appName="PythonStreamingNetworkWordJoinSentiments") + ssc = StreamingContext(sc, 5) + + # Read in the word-sentiment list and create a static RDD from it + word_sentiments_file_path = "data/streaming/AFINN-111.txt" + word_sentiments = ssc.sparkContext.textFile(word_sentiments_file_path) \ + .map(lambda line: tuple(line.split("\t"))) + + lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) + + word_counts = lines.flatMap(lambda line: line.split(" ")) \ + .map(lambda word: (word, 1)) \ + .reduceByKey(lambda a, b: a + b) + + # Determine the words with the highest sentiment values by joining the streaming RDD + # with the static RDD inside the transform() method and then multiplying + # the frequency of the words by its sentiment value + happiest_words = word_counts.transform(lambda rdd: word_sentiments.join(rdd)) \ + .map(lambda (word, tuple): (word, float(tuple[0]) * tuple[1])) \ + .map(lambda (word, happiness): (happiness, word)) \ + .transform(lambda rdd: rdd.sortByKey(False)) + + happiest_words.foreachRDD(print_happiest_words) + + ssc.start() + ssc.awaitTermination() diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterHashTagJoinSentiments.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterHashTagJoinSentiments.scala new file mode 100644 index 0000000000..0328fa81ea --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterHashTagJoinSentiments.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples.streaming + +import org.apache.spark.SparkConf +import org.apache.spark.streaming.twitter.TwitterUtils +import org.apache.spark.streaming.{Seconds, StreamingContext} + +/** + * Displays the most positive hash tags by joining the streaming Twitter data with a static RDD of + * the AFINN word list (http://neuro.imm.dtu.dk/wiki/AFINN) + */ +object TwitterHashTagJoinSentiments { + def main(args: Array[String]) { + if (args.length < 4) { + System.err.println("Usage: TwitterHashTagJoinSentiments <consumer key> <consumer secret> " + + "<access token> <access token secret> [<filters>]") + System.exit(1) + } + + StreamingExamples.setStreamingLogLevels() + + val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4) + val filters = args.takeRight(args.length - 4) + + // Set the system properties so that Twitter4j library used by Twitter stream + // can use them to generate OAuth credentials + System.setProperty("twitter4j.oauth.consumerKey", consumerKey) + System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret) + System.setProperty("twitter4j.oauth.accessToken", accessToken) + System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret) + + val sparkConf = new SparkConf().setAppName("TwitterHashTagJoinSentiments") + val ssc = new StreamingContext(sparkConf, Seconds(2)) + val stream = TwitterUtils.createStream(ssc, None, filters) + + val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) + + // Read in the word-sentiment list and create a static RDD from it + val wordSentimentFilePath = "data/streaming/AFINN-111.txt" + val wordSentiments = ssc.sparkContext.textFile(wordSentimentFilePath).map { line => + val Array(word, happinessValue) = line.split("\t") + (word, happinessValue) + } cache() + + // Determine the hash tags with the highest sentiment values by joining the streaming RDD + // with the static RDD inside the transform() method and then multiplying + // the frequency of the hash tag by its sentiment value + val happiest60 = hashTags.map(hashTag => (hashTag.tail, 1)) + .reduceByKeyAndWindow(_ + _, Seconds(60)) + .transform{topicCount => wordSentiments.join(topicCount)} + .map{case (topic, tuple) => (topic, tuple._1 * tuple._2)} + .map{case (topic, happinessValue) => (happinessValue, topic)} + .transform(_.sortByKey(false)) + + val happiest10 = hashTags.map(hashTag => (hashTag.tail, 1)) + .reduceByKeyAndWindow(_ + _, Seconds(10)) + .transform{topicCount => wordSentiments.join(topicCount)} + .map{case (topic, tuple) => (topic, tuple._1 * tuple._2)} + .map{case (topic, happinessValue) => (happinessValue, topic)} + .transform(_.sortByKey(false)) + + // Print hash tags with the most positive sentiment values + happiest60.foreachRDD(rdd => { + val topList = rdd.take(10) + println("\nHappiest topics in last 60 seconds (%s total):".format(rdd.count())) + topList.foreach{case (happiness, tag) => println("%s (%s happiness)".format(tag, happiness))} + }) + + happiest10.foreachRDD(rdd => { + val topList = rdd.take(10) + println("\nHappiest topics in last 10 seconds (%s total):".format(rdd.count())) + topList.foreach{case (happiness, tag) => println("%s (%s happiness)".format(tag, happiness))} + }) + + ssc.start() + ssc.awaitTermination() + } +} +// scalastyle:on println |