aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/python/streaming/network_wordjoinsentiments.py
diff options
context:
space:
mode:
authorJeff L <sha0lin@alumni.carnegiemellon.edu>2015-12-18 15:06:54 +0000
committerSean Owen <sowen@cloudera.com>2015-12-18 15:06:54 +0000
commitea59b0f3a6600f8046e5f3f55e89257614fb1f10 (patch)
treef2a7a4df2c5ece58253b98a0a60b598730f91531 /examples/src/main/python/streaming/network_wordjoinsentiments.py
parent2bebaa39d9da33bc93ef682959cd42c1968a6a3e (diff)
downloadspark-ea59b0f3a6600f8046e5f3f55e89257614fb1f10.tar.gz
spark-ea59b0f3a6600f8046e5f3f55e89257614fb1f10.tar.bz2
spark-ea59b0f3a6600f8046e5f3f55e89257614fb1f10.zip
[SPARK-9057][STREAMING] Twitter example joining to static RDD of word sentiment values
Example of joining a static RDD of word sentiments to a streaming RDD of Tweets in order to demo the usage of the transform() method. Author: Jeff L <sha0lin@alumni.carnegiemellon.edu> Closes #8431 from Agent007/SPARK-9057.
Diffstat (limited to 'examples/src/main/python/streaming/network_wordjoinsentiments.py')
-rw-r--r--examples/src/main/python/streaming/network_wordjoinsentiments.py77
1 files changed, 77 insertions, 0 deletions
diff --git a/examples/src/main/python/streaming/network_wordjoinsentiments.py b/examples/src/main/python/streaming/network_wordjoinsentiments.py
new file mode 100644
index 0000000000..b85517dfdd
--- /dev/null
+++ b/examples/src/main/python/streaming/network_wordjoinsentiments.py
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+ Shows the most positive words in UTF8 encoded, '\n' delimited text directly received the network
+ every 5 seconds. The streaming data is joined with a static RDD of the AFINN word list
+ (http://neuro.imm.dtu.dk/wiki/AFINN)
+
+ Usage: network_wordjoinsentiments.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+ `$ nc -lk 9999`
+ and then run the example
+ `$ bin/spark-submit examples/src/main/python/streaming/network_wordjoinsentiments.py \
+ localhost 9999`
+"""
+
+from __future__ import print_function
+
+import sys
+
+from pyspark import SparkContext
+from pyspark.streaming import StreamingContext
+
+
+def print_happiest_words(rdd):
+ top_list = rdd.take(5)
+ print("Happiest topics in the last 5 seconds (%d total):" % rdd.count())
+ for tuple in top_list:
+ print("%s (%d happiness)" % (tuple[1], tuple[0]))
+
+if __name__ == "__main__":
+ if len(sys.argv) != 3:
+ print("Usage: network_wordjoinsentiments.py <hostname> <port>", file=sys.stderr)
+ exit(-1)
+
+ sc = SparkContext(appName="PythonStreamingNetworkWordJoinSentiments")
+ ssc = StreamingContext(sc, 5)
+
+ # Read in the word-sentiment list and create a static RDD from it
+ word_sentiments_file_path = "data/streaming/AFINN-111.txt"
+ word_sentiments = ssc.sparkContext.textFile(word_sentiments_file_path) \
+ .map(lambda line: tuple(line.split("\t")))
+
+ lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
+
+ word_counts = lines.flatMap(lambda line: line.split(" ")) \
+ .map(lambda word: (word, 1)) \
+ .reduceByKey(lambda a, b: a + b)
+
+ # Determine the words with the highest sentiment values by joining the streaming RDD
+ # with the static RDD inside the transform() method and then multiplying
+ # the frequency of the words by its sentiment value
+ happiest_words = word_counts.transform(lambda rdd: word_sentiments.join(rdd)) \
+ .map(lambda (word, tuple): (word, float(tuple[0]) * tuple[1])) \
+ .map(lambda (word, happiness): (happiness, word)) \
+ .transform(lambda rdd: rdd.sortByKey(False))
+
+ happiest_words.foreachRDD(print_happiest_words)
+
+ ssc.start()
+ ssc.awaitTermination()