aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorJeff L <sha0lin@alumni.carnegiemellon.edu>2015-12-18 15:06:54 +0000
committerSean Owen <sowen@cloudera.com>2015-12-18 15:06:54 +0000
commitea59b0f3a6600f8046e5f3f55e89257614fb1f10 (patch)
treef2a7a4df2c5ece58253b98a0a60b598730f91531 /examples
parent2bebaa39d9da33bc93ef682959cd42c1968a6a3e (diff)
downloadspark-ea59b0f3a6600f8046e5f3f55e89257614fb1f10.tar.gz
spark-ea59b0f3a6600f8046e5f3f55e89257614fb1f10.tar.bz2
spark-ea59b0f3a6600f8046e5f3f55e89257614fb1f10.zip
[SPARK-9057][STREAMING] Twitter example joining to static RDD of word sentiment values
Example of joining a static RDD of word sentiments to a streaming RDD of Tweets in order to demo the usage of the transform() method. Author: Jeff L <sha0lin@alumni.carnegiemellon.edu> Closes #8431 from Agent007/SPARK-9057.
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java180
-rw-r--r--examples/src/main/python/streaming/network_wordjoinsentiments.py77
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/TwitterHashTagJoinSentiments.scala96
3 files changed, 353 insertions, 0 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
new file mode 100644
index 0000000000..030ee30b93
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.twitter.TwitterUtils;
+import scala.Tuple2;
+import twitter4j.Status;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Displays the most positive hash tags by joining the streaming Twitter data with a static RDD of
+ * the AFINN word list (http://neuro.imm.dtu.dk/wiki/AFINN)
+ */
+public class JavaTwitterHashTagJoinSentiments {
+
+ public static void main(String[] args) throws IOException {
+ if (args.length < 4) {
+ System.err.println("Usage: JavaTwitterHashTagJoinSentiments <consumer key> <consumer secret>" +
+ " <access token> <access token secret> [<filters>]");
+ System.exit(1);
+ }
+
+ StreamingExamples.setStreamingLogLevels();
+
+ String consumerKey = args[0];
+ String consumerSecret = args[1];
+ String accessToken = args[2];
+ String accessTokenSecret = args[3];
+ String[] filters = Arrays.copyOfRange(args, 4, args.length);
+
+ // Set the system properties so that Twitter4j library used by Twitter stream
+ // can use them to generate OAuth credentials
+ System.setProperty("twitter4j.oauth.consumerKey", consumerKey);
+ System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret);
+ System.setProperty("twitter4j.oauth.accessToken", accessToken);
+ System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret);
+
+ SparkConf sparkConf = new SparkConf().setAppName("JavaTwitterHashTagJoinSentiments");
+ JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
+ JavaReceiverInputDStream<Status> stream = TwitterUtils.createStream(jssc, filters);
+
+ JavaDStream<String> words = stream.flatMap(new FlatMapFunction<Status, String>() {
+ @Override
+ public Iterable<String> call(Status s) {
+ return Arrays.asList(s.getText().split(" "));
+ }
+ });
+
+ JavaDStream<String> hashTags = words.filter(new Function<String, Boolean>() {
+ @Override
+ public Boolean call(String word) throws Exception {
+ return word.startsWith("#");
+ }
+ });
+
+ // Read in the word-sentiment list and create a static RDD from it
+ String wordSentimentFilePath = "data/streaming/AFINN-111.txt";
+ final JavaPairRDD<String, Double> wordSentiments = jssc.sparkContext().textFile(wordSentimentFilePath)
+ .mapToPair(new PairFunction<String, String, Double>(){
+ @Override
+ public Tuple2<String, Double> call(String line) {
+ String[] columns = line.split("\t");
+ return new Tuple2<String, Double>(columns[0],
+ Double.parseDouble(columns[1]));
+ }
+ });
+
+ JavaPairDStream<String, Integer> hashTagCount = hashTags.mapToPair(
+ new PairFunction<String, String, Integer>() {
+ @Override
+ public Tuple2<String, Integer> call(String s) {
+ // leave out the # character
+ return new Tuple2<String, Integer>(s.substring(1), 1);
+ }
+ });
+
+ JavaPairDStream<String, Integer> hashTagTotals = hashTagCount.reduceByKeyAndWindow(
+ new Function2<Integer, Integer, Integer>() {
+ @Override
+ public Integer call(Integer a, Integer b) {
+ return a + b;
+ }
+ }, new Duration(10000));
+
+ // Determine the hash tags with the highest sentiment values by joining the streaming RDD
+ // with the static RDD inside the transform() method and then multiplying
+ // the frequency of the hash tag by its sentiment value
+ JavaPairDStream<String, Tuple2<Double, Integer>> joinedTuples =
+ hashTagTotals.transformToPair(new Function<JavaPairRDD<String, Integer>,
+ JavaPairRDD<String, Tuple2<Double, Integer>>>() {
+ @Override
+ public JavaPairRDD<String, Tuple2<Double, Integer>> call(JavaPairRDD<String,
+ Integer> topicCount)
+ throws Exception {
+ return wordSentiments.join(topicCount);
+ }
+ });
+
+ JavaPairDStream<String, Double> topicHappiness = joinedTuples.mapToPair(
+ new PairFunction<Tuple2<String, Tuple2<Double, Integer>>, String, Double>() {
+ @Override
+ public Tuple2<String, Double> call(Tuple2<String,
+ Tuple2<Double, Integer>> topicAndTuplePair) throws Exception {
+ Tuple2<Double, Integer> happinessAndCount = topicAndTuplePair._2();
+ return new Tuple2<String, Double>(topicAndTuplePair._1(),
+ happinessAndCount._1() * happinessAndCount._2());
+ }
+ });
+
+ JavaPairDStream<Double, String> happinessTopicPairs = topicHappiness.mapToPair(
+ new PairFunction<Tuple2<String, Double>, Double, String>() {
+ @Override
+ public Tuple2<Double, String> call(Tuple2<String, Double> topicHappiness)
+ throws Exception {
+ return new Tuple2<Double, String>(topicHappiness._2(),
+ topicHappiness._1());
+ }
+ });
+
+ JavaPairDStream<Double, String> happiest10 = happinessTopicPairs.transformToPair(
+ new Function<JavaPairRDD<Double, String>, JavaPairRDD<Double, String>>() {
+ @Override
+ public JavaPairRDD<Double, String> call(JavaPairRDD<Double,
+ String> happinessAndTopics) throws Exception {
+ return happinessAndTopics.sortByKey(false);
+ }
+ }
+ );
+
+ // Print hash tags with the most positive sentiment values
+ happiest10.foreachRDD(new Function<JavaPairRDD<Double, String>, Void>() {
+ @Override
+ public Void call(JavaPairRDD<Double, String> happinessTopicPairs) throws Exception {
+ List<Tuple2<Double, String>> topList = happinessTopicPairs.take(10);
+ System.out.println(
+ String.format("\nHappiest topics in last 10 seconds (%s total):",
+ happinessTopicPairs.count()));
+ for (Tuple2<Double, String> pair : topList) {
+ System.out.println(
+ String.format("%s (%s happiness)", pair._2(), pair._1()));
+ }
+ return null;
+ }
+ });
+
+ jssc.start();
+ jssc.awaitTermination();
+ }
+}
diff --git a/examples/src/main/python/streaming/network_wordjoinsentiments.py b/examples/src/main/python/streaming/network_wordjoinsentiments.py
new file mode 100644
index 0000000000..b85517dfdd
--- /dev/null
+++ b/examples/src/main/python/streaming/network_wordjoinsentiments.py
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+ Shows the most positive words in UTF8 encoded, '\n' delimited text directly received the network
+ every 5 seconds. The streaming data is joined with a static RDD of the AFINN word list
+ (http://neuro.imm.dtu.dk/wiki/AFINN)
+
+ Usage: network_wordjoinsentiments.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+ `$ nc -lk 9999`
+ and then run the example
+ `$ bin/spark-submit examples/src/main/python/streaming/network_wordjoinsentiments.py \
+ localhost 9999`
+"""
+
+from __future__ import print_function
+
+import sys
+
+from pyspark import SparkContext
+from pyspark.streaming import StreamingContext
+
+
+def print_happiest_words(rdd):
+ top_list = rdd.take(5)
+ print("Happiest topics in the last 5 seconds (%d total):" % rdd.count())
+ for tuple in top_list:
+ print("%s (%d happiness)" % (tuple[1], tuple[0]))
+
+if __name__ == "__main__":
+ if len(sys.argv) != 3:
+ print("Usage: network_wordjoinsentiments.py <hostname> <port>", file=sys.stderr)
+ exit(-1)
+
+ sc = SparkContext(appName="PythonStreamingNetworkWordJoinSentiments")
+ ssc = StreamingContext(sc, 5)
+
+ # Read in the word-sentiment list and create a static RDD from it
+ word_sentiments_file_path = "data/streaming/AFINN-111.txt"
+ word_sentiments = ssc.sparkContext.textFile(word_sentiments_file_path) \
+ .map(lambda line: tuple(line.split("\t")))
+
+ lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
+
+ word_counts = lines.flatMap(lambda line: line.split(" ")) \
+ .map(lambda word: (word, 1)) \
+ .reduceByKey(lambda a, b: a + b)
+
+ # Determine the words with the highest sentiment values by joining the streaming RDD
+ # with the static RDD inside the transform() method and then multiplying
+ # the frequency of the words by its sentiment value
+ happiest_words = word_counts.transform(lambda rdd: word_sentiments.join(rdd)) \
+ .map(lambda (word, tuple): (word, float(tuple[0]) * tuple[1])) \
+ .map(lambda (word, happiness): (happiness, word)) \
+ .transform(lambda rdd: rdd.sortByKey(False))
+
+ happiest_words.foreachRDD(print_happiest_words)
+
+ ssc.start()
+ ssc.awaitTermination()
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterHashTagJoinSentiments.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterHashTagJoinSentiments.scala
new file mode 100644
index 0000000000..0328fa81ea
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterHashTagJoinSentiments.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.streaming
+
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming.twitter.TwitterUtils
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+
+/**
+ * Displays the most positive hash tags by joining the streaming Twitter data with a static RDD of
+ * the AFINN word list (http://neuro.imm.dtu.dk/wiki/AFINN)
+ */
+object TwitterHashTagJoinSentiments {
+ def main(args: Array[String]) {
+ if (args.length < 4) {
+ System.err.println("Usage: TwitterHashTagJoinSentiments <consumer key> <consumer secret> " +
+ "<access token> <access token secret> [<filters>]")
+ System.exit(1)
+ }
+
+ StreamingExamples.setStreamingLogLevels()
+
+ val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
+ val filters = args.takeRight(args.length - 4)
+
+ // Set the system properties so that Twitter4j library used by Twitter stream
+ // can use them to generate OAuth credentials
+ System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
+ System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
+ System.setProperty("twitter4j.oauth.accessToken", accessToken)
+ System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
+
+ val sparkConf = new SparkConf().setAppName("TwitterHashTagJoinSentiments")
+ val ssc = new StreamingContext(sparkConf, Seconds(2))
+ val stream = TwitterUtils.createStream(ssc, None, filters)
+
+ val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
+
+ // Read in the word-sentiment list and create a static RDD from it
+ val wordSentimentFilePath = "data/streaming/AFINN-111.txt"
+ val wordSentiments = ssc.sparkContext.textFile(wordSentimentFilePath).map { line =>
+ val Array(word, happinessValue) = line.split("\t")
+ (word, happinessValue)
+ } cache()
+
+ // Determine the hash tags with the highest sentiment values by joining the streaming RDD
+ // with the static RDD inside the transform() method and then multiplying
+ // the frequency of the hash tag by its sentiment value
+ val happiest60 = hashTags.map(hashTag => (hashTag.tail, 1))
+ .reduceByKeyAndWindow(_ + _, Seconds(60))
+ .transform{topicCount => wordSentiments.join(topicCount)}
+ .map{case (topic, tuple) => (topic, tuple._1 * tuple._2)}
+ .map{case (topic, happinessValue) => (happinessValue, topic)}
+ .transform(_.sortByKey(false))
+
+ val happiest10 = hashTags.map(hashTag => (hashTag.tail, 1))
+ .reduceByKeyAndWindow(_ + _, Seconds(10))
+ .transform{topicCount => wordSentiments.join(topicCount)}
+ .map{case (topic, tuple) => (topic, tuple._1 * tuple._2)}
+ .map{case (topic, happinessValue) => (happinessValue, topic)}
+ .transform(_.sortByKey(false))
+
+ // Print hash tags with the most positive sentiment values
+ happiest60.foreachRDD(rdd => {
+ val topList = rdd.take(10)
+ println("\nHappiest topics in last 60 seconds (%s total):".format(rdd.count()))
+ topList.foreach{case (happiness, tag) => println("%s (%s happiness)".format(tag, happiness))}
+ })
+
+ happiest10.foreachRDD(rdd => {
+ val topList = rdd.take(10)
+ println("\nHappiest topics in last 10 seconds (%s total):".format(rdd.count()))
+ topList.foreach{case (happiness, tag) => println("%s (%s happiness)".format(tag, happiness))}
+ })
+
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}
+// scalastyle:on println