aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/scala
diff options
context:
space:
mode:
authorJeff L <sha0lin@alumni.carnegiemellon.edu>2015-12-18 15:06:54 +0000
committerSean Owen <sowen@cloudera.com>2015-12-18 15:06:54 +0000
commitea59b0f3a6600f8046e5f3f55e89257614fb1f10 (patch)
treef2a7a4df2c5ece58253b98a0a60b598730f91531 /examples/src/main/scala
parent2bebaa39d9da33bc93ef682959cd42c1968a6a3e (diff)
downloadspark-ea59b0f3a6600f8046e5f3f55e89257614fb1f10.tar.gz
spark-ea59b0f3a6600f8046e5f3f55e89257614fb1f10.tar.bz2
spark-ea59b0f3a6600f8046e5f3f55e89257614fb1f10.zip
[SPARK-9057][STREAMING] Twitter example joining to static RDD of word sentiment values
Example of joining a static RDD of word sentiments to a streaming RDD of Tweets in order to demo the usage of the transform() method. Author: Jeff L <sha0lin@alumni.carnegiemellon.edu> Closes #8431 from Agent007/SPARK-9057.
Diffstat (limited to 'examples/src/main/scala')
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/TwitterHashTagJoinSentiments.scala96
1 files changed, 96 insertions, 0 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterHashTagJoinSentiments.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterHashTagJoinSentiments.scala
new file mode 100644
index 0000000000..0328fa81ea
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterHashTagJoinSentiments.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.streaming
+
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming.twitter.TwitterUtils
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+
+/**
+ * Displays the most positive hash tags by joining the streaming Twitter data with a static RDD of
+ * the AFINN word list (http://neuro.imm.dtu.dk/wiki/AFINN)
+ */
+object TwitterHashTagJoinSentiments {
+ def main(args: Array[String]) {
+ if (args.length < 4) {
+ System.err.println("Usage: TwitterHashTagJoinSentiments <consumer key> <consumer secret> " +
+ "<access token> <access token secret> [<filters>]")
+ System.exit(1)
+ }
+
+ StreamingExamples.setStreamingLogLevels()
+
+ val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
+ val filters = args.takeRight(args.length - 4)
+
+ // Set the system properties so that Twitter4j library used by Twitter stream
+ // can use them to generate OAuth credentials
+ System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
+ System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
+ System.setProperty("twitter4j.oauth.accessToken", accessToken)
+ System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
+
+ val sparkConf = new SparkConf().setAppName("TwitterHashTagJoinSentiments")
+ val ssc = new StreamingContext(sparkConf, Seconds(2))
+ val stream = TwitterUtils.createStream(ssc, None, filters)
+
+ val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
+
+ // Read in the word-sentiment list and create a static RDD from it
+ val wordSentimentFilePath = "data/streaming/AFINN-111.txt"
+ val wordSentiments = ssc.sparkContext.textFile(wordSentimentFilePath).map { line =>
+ val Array(word, happinessValue) = line.split("\t")
+ (word, happinessValue)
+ } cache()
+
+ // Determine the hash tags with the highest sentiment values by joining the streaming RDD
+ // with the static RDD inside the transform() method and then multiplying
+ // the frequency of the hash tag by its sentiment value
+ val happiest60 = hashTags.map(hashTag => (hashTag.tail, 1))
+ .reduceByKeyAndWindow(_ + _, Seconds(60))
+ .transform{topicCount => wordSentiments.join(topicCount)}
+ .map{case (topic, tuple) => (topic, tuple._1 * tuple._2)}
+ .map{case (topic, happinessValue) => (happinessValue, topic)}
+ .transform(_.sortByKey(false))
+
+ val happiest10 = hashTags.map(hashTag => (hashTag.tail, 1))
+ .reduceByKeyAndWindow(_ + _, Seconds(10))
+ .transform{topicCount => wordSentiments.join(topicCount)}
+ .map{case (topic, tuple) => (topic, tuple._1 * tuple._2)}
+ .map{case (topic, happinessValue) => (happinessValue, topic)}
+ .transform(_.sortByKey(false))
+
+ // Print hash tags with the most positive sentiment values
+ happiest60.foreachRDD(rdd => {
+ val topList = rdd.take(10)
+ println("\nHappiest topics in last 60 seconds (%s total):".format(rdd.count()))
+ topList.foreach{case (happiness, tag) => println("%s (%s happiness)".format(tag, happiness))}
+ })
+
+ happiest10.foreachRDD(rdd => {
+ val topList = rdd.take(10)
+ println("\nHappiest topics in last 10 seconds (%s total):".format(rdd.count()))
+ topList.foreach{case (happiness, tag) => println("%s (%s happiness)".format(tag, happiness))}
+ })
+
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}
+// scalastyle:on println