aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-03-14 16:56:04 -0700
committerReynold Xin <rxin@databricks.com>2016-03-14 16:56:04 -0700
commit06dec37455c3f800897defee6fad0da623f26050 (patch)
treed49dd098587f8a3c7a019b0aad605327da6fcecd /examples/src/main
parent8301fadd8d269da11e72870b7a889596e3337839 (diff)
downloadspark-06dec37455c3f800897defee6fad0da623f26050.tar.gz
spark-06dec37455c3f800897defee6fad0da623f26050.tar.bz2
spark-06dec37455c3f800897defee6fad0da623f26050.zip
[SPARK-13843][STREAMING] Remove streaming-flume, streaming-mqtt, streaming-zeromq, streaming-akka, streaming-twitter to Spark packages
## What changes were proposed in this pull request? Currently there are a few sub-projects, each for integrating with different external sources for Streaming. Now that we have better ability to include external libraries (spark packages) and with Spark 2.0 coming up, we can move the following projects out of Spark to https://github.com/spark-packages - streaming-flume - streaming-akka - streaming-mqtt - streaming-zeromq - streaming-twitter They are just some ancillary packages and considering the overhead of maintenance, running tests and PR failures, it's better to maintain them out of Spark. In addition, these projects can have their different release cycles and we can release them faster. I have already copied these projects to https://github.com/spark-packages ## How was this patch tested? Jenkins tests Author: Shixiong Zhu <shixiong@databricks.com> Closes #11672 from zsxwing/remove-external-pkg.
Diffstat (limited to 'examples/src/main')
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java144
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java75
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java175
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala175
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala70
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala67
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala119
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala116
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala94
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/TwitterHashTagJoinSentiments.scala96
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala85
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala105
12 files changed, 0 insertions, 1321 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
deleted file mode 100644
index 7884b8cdff..0000000000
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.streaming;
-
-import java.util.Arrays;
-import java.util.Iterator;
-
-import scala.Tuple2;
-
-import akka.actor.ActorSelection;
-import akka.actor.Props;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.akka.AkkaUtils;
-import org.apache.spark.streaming.akka.JavaActorReceiver;
-
-/**
- * A sample actor as receiver, is also simplest. This receiver actor
- * goes and subscribe to a typical publisher/feeder actor and receives
- * data.
- *
- * @see [[org.apache.spark.examples.streaming.FeederActor]]
- */
-class JavaSampleActorReceiver<T> extends JavaActorReceiver {
-
- private final String urlOfPublisher;
-
- public JavaSampleActorReceiver(String urlOfPublisher) {
- this.urlOfPublisher = urlOfPublisher;
- }
-
- private ActorSelection remotePublisher;
-
- @Override
- public void preStart() {
- remotePublisher = getContext().actorSelection(urlOfPublisher);
- remotePublisher.tell(new SubscribeReceiver(getSelf()), getSelf());
- }
-
- @Override
- public void onReceive(Object msg) throws Exception {
- @SuppressWarnings("unchecked")
- T msgT = (T) msg;
- store(msgT);
- }
-
- @Override
- public void postStop() {
- remotePublisher.tell(new UnsubscribeReceiver(getSelf()), getSelf());
- }
-}
-
-/**
- * A sample word count program demonstrating the use of plugging in
- * Actor as Receiver
- * Usage: JavaActorWordCount <hostname> <port>
- * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on.
- *
- * To run this example locally, you may run Feeder Actor as
- * <code><pre>
- * $ bin/run-example org.apache.spark.examples.streaming.FeederActor localhost 9999
- * </pre></code>
- * and then run the example
- * <code><pre>
- * $ bin/run-example org.apache.spark.examples.streaming.JavaActorWordCount localhost 9999
- * </pre></code>
- */
-public class JavaActorWordCount {
-
- public static void main(String[] args) {
- if (args.length < 2) {
- System.err.println("Usage: JavaActorWordCount <hostname> <port>");
- System.exit(1);
- }
-
- StreamingExamples.setStreamingLogLevels();
-
- final String host = args[0];
- final String port = args[1];
- SparkConf sparkConf = new SparkConf().setAppName("JavaActorWordCount");
- // Create the context and set the batch size
- JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
-
- String feederActorURI = "akka.tcp://test@" + host + ":" + port + "/user/FeederActor";
-
- /*
- * Following is the use of AkkaUtils.createStream to plug in custom actor as receiver
- *
- * An important point to note:
- * Since Actor may exist outside the spark framework, It is thus user's responsibility
- * to ensure the type safety, i.e type of data received and InputDstream
- * should be same.
- *
- * For example: Both AkkaUtils.createStream and JavaSampleActorReceiver are parameterized
- * to same type to ensure type safety.
- */
- JavaDStream<String> lines = AkkaUtils.createStream(
- jssc,
- Props.create(JavaSampleActorReceiver.class, feederActorURI),
- "SampleReceiver");
-
- // compute wordcount
- lines.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> call(String s) {
- return Arrays.asList(s.split("\\s+")).iterator();
- }
- }).mapToPair(new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String s) {
- return new Tuple2<>(s, 1);
- }
- }).reduceByKey(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer i1, Integer i2) {
- return i1 + i2;
- }
- }).print();
-
- jssc.start();
- jssc.awaitTermination();
- }
-}
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
deleted file mode 100644
index da56637fe8..0000000000
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.streaming;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.examples.streaming.StreamingExamples;
-import org.apache.spark.streaming.*;
-import org.apache.spark.streaming.api.java.*;
-import org.apache.spark.streaming.flume.FlumeUtils;
-import org.apache.spark.streaming.flume.SparkFlumeEvent;
-
-/**
- * Produces a count of events received from Flume.
- *
- * This should be used in conjunction with an AvroSink in Flume. It will start
- * an Avro server on at the request host:port address and listen for requests.
- * Your Flume AvroSink should be pointed to this address.
- *
- * Usage: JavaFlumeEventCount <host> <port>
- * <host> is the host the Flume receiver will be started on - a receiver
- * creates a server and listens for flume events.
- * <port> is the port the Flume receiver will listen on.
- *
- * To run this example:
- * `$ bin/run-example org.apache.spark.examples.streaming.JavaFlumeEventCount <host> <port>`
- */
-public final class JavaFlumeEventCount {
- private JavaFlumeEventCount() {
- }
-
- public static void main(String[] args) {
- if (args.length != 2) {
- System.err.println("Usage: JavaFlumeEventCount <host> <port>");
- System.exit(1);
- }
-
- StreamingExamples.setStreamingLogLevels();
-
- String host = args[0];
- int port = Integer.parseInt(args[1]);
-
- Duration batchInterval = new Duration(2000);
- SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
- JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval);
- JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port);
-
- flumeStream.count();
-
- flumeStream.count().map(new Function<Long, String>() {
- @Override
- public String call(Long in) {
- return "Received " + in + " flume events.";
- }
- }).print();
-
- ssc.start();
- ssc.awaitTermination();
- }
-}
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
deleted file mode 100644
index f0ae9a99ba..0000000000
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.streaming;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.api.java.function.VoidFunction;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.twitter.TwitterUtils;
-import scala.Tuple2;
-import twitter4j.Status;
-
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * Displays the most positive hash tags by joining the streaming Twitter data with a static RDD of
- * the AFINN word list (http://neuro.imm.dtu.dk/wiki/AFINN)
- */
-public class JavaTwitterHashTagJoinSentiments {
-
- public static void main(String[] args) {
- if (args.length < 4) {
- System.err.println("Usage: JavaTwitterHashTagJoinSentiments <consumer key> <consumer secret>" +
- " <access token> <access token secret> [<filters>]");
- System.exit(1);
- }
-
- StreamingExamples.setStreamingLogLevels();
-
- String consumerKey = args[0];
- String consumerSecret = args[1];
- String accessToken = args[2];
- String accessTokenSecret = args[3];
- String[] filters = Arrays.copyOfRange(args, 4, args.length);
-
- // Set the system properties so that Twitter4j library used by Twitter stream
- // can use them to generate OAuth credentials
- System.setProperty("twitter4j.oauth.consumerKey", consumerKey);
- System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret);
- System.setProperty("twitter4j.oauth.accessToken", accessToken);
- System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret);
-
- SparkConf sparkConf = new SparkConf().setAppName("JavaTwitterHashTagJoinSentiments");
- JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
- JavaReceiverInputDStream<Status> stream = TwitterUtils.createStream(jssc, filters);
-
- JavaDStream<String> words = stream.flatMap(new FlatMapFunction<Status, String>() {
- @Override
- public Iterator<String> call(Status s) {
- return Arrays.asList(s.getText().split(" ")).iterator();
- }
- });
-
- JavaDStream<String> hashTags = words.filter(new Function<String, Boolean>() {
- @Override
- public Boolean call(String word) {
- return word.startsWith("#");
- }
- });
-
- // Read in the word-sentiment list and create a static RDD from it
- String wordSentimentFilePath = "data/streaming/AFINN-111.txt";
- final JavaPairRDD<String, Double> wordSentiments = jssc.sparkContext().textFile(wordSentimentFilePath)
- .mapToPair(new PairFunction<String, String, Double>(){
- @Override
- public Tuple2<String, Double> call(String line) {
- String[] columns = line.split("\t");
- return new Tuple2<>(columns[0], Double.parseDouble(columns[1]));
- }
- });
-
- JavaPairDStream<String, Integer> hashTagCount = hashTags.mapToPair(
- new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String s) {
- // leave out the # character
- return new Tuple2<>(s.substring(1), 1);
- }
- });
-
- JavaPairDStream<String, Integer> hashTagTotals = hashTagCount.reduceByKeyAndWindow(
- new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer a, Integer b) {
- return a + b;
- }
- }, new Duration(10000));
-
- // Determine the hash tags with the highest sentiment values by joining the streaming RDD
- // with the static RDD inside the transform() method and then multiplying
- // the frequency of the hash tag by its sentiment value
- JavaPairDStream<String, Tuple2<Double, Integer>> joinedTuples =
- hashTagTotals.transformToPair(new Function<JavaPairRDD<String, Integer>,
- JavaPairRDD<String, Tuple2<Double, Integer>>>() {
- @Override
- public JavaPairRDD<String, Tuple2<Double, Integer>> call(
- JavaPairRDD<String, Integer> topicCount) {
- return wordSentiments.join(topicCount);
- }
- });
-
- JavaPairDStream<String, Double> topicHappiness = joinedTuples.mapToPair(
- new PairFunction<Tuple2<String, Tuple2<Double, Integer>>, String, Double>() {
- @Override
- public Tuple2<String, Double> call(Tuple2<String,
- Tuple2<Double, Integer>> topicAndTuplePair) {
- Tuple2<Double, Integer> happinessAndCount = topicAndTuplePair._2();
- return new Tuple2<>(topicAndTuplePair._1(),
- happinessAndCount._1() * happinessAndCount._2());
- }
- });
-
- JavaPairDStream<Double, String> happinessTopicPairs = topicHappiness.mapToPair(
- new PairFunction<Tuple2<String, Double>, Double, String>() {
- @Override
- public Tuple2<Double, String> call(Tuple2<String, Double> topicHappiness) {
- return new Tuple2<>(topicHappiness._2(),
- topicHappiness._1());
- }
- });
-
- JavaPairDStream<Double, String> happiest10 = happinessTopicPairs.transformToPair(
- new Function<JavaPairRDD<Double, String>, JavaPairRDD<Double, String>>() {
- @Override
- public JavaPairRDD<Double, String> call(
- JavaPairRDD<Double, String> happinessAndTopics) {
- return happinessAndTopics.sortByKey(false);
- }
- }
- );
-
- // Print hash tags with the most positive sentiment values
- happiest10.foreachRDD(new VoidFunction<JavaPairRDD<Double, String>>() {
- @Override
- public void call(JavaPairRDD<Double, String> happinessTopicPairs) {
- List<Tuple2<Double, String>> topList = happinessTopicPairs.take(10);
- System.out.println(
- String.format("\nHappiest topics in last 10 seconds (%s total):",
- happinessTopicPairs.count()));
- for (Tuple2<Double, String> pair : topList) {
- System.out.println(
- String.format("%s (%s happiness)", pair._2(), pair._1()));
- }
- }
- });
-
- jssc.start();
- jssc.awaitTermination();
- }
-}
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
deleted file mode 100644
index 844772a289..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-
-import scala.collection.mutable.LinkedHashSet
-import scala.util.Random
-
-import akka.actor._
-import com.typesafe.config.ConfigFactory
-
-import org.apache.spark.SparkConf
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}
-
-case class SubscribeReceiver(receiverActor: ActorRef)
-case class UnsubscribeReceiver(receiverActor: ActorRef)
-
-/**
- * Sends the random content to every receiver subscribed with 1/2
- * second delay.
- */
-class FeederActor extends Actor {
-
- val rand = new Random()
- val receivers = new LinkedHashSet[ActorRef]()
-
- val strings: Array[String] = Array("words ", "may ", "count ")
-
- def makeMessage(): String = {
- val x = rand.nextInt(3)
- strings(x) + strings(2 - x)
- }
-
- /*
- * A thread to generate random messages
- */
- new Thread() {
- override def run() {
- while (true) {
- Thread.sleep(500)
- receivers.foreach(_ ! makeMessage)
- }
- }
- }.start()
-
- def receive: Receive = {
- case SubscribeReceiver(receiverActor: ActorRef) =>
- println("received subscribe from %s".format(receiverActor.toString))
- receivers += receiverActor
-
- case UnsubscribeReceiver(receiverActor: ActorRef) =>
- println("received unsubscribe from %s".format(receiverActor.toString))
- receivers -= receiverActor
- }
-}
-
-/**
- * A sample actor as receiver, is also simplest. This receiver actor
- * goes and subscribe to a typical publisher/feeder actor and receives
- * data.
- *
- * @see [[org.apache.spark.examples.streaming.FeederActor]]
- */
-class SampleActorReceiver[T](urlOfPublisher: String) extends ActorReceiver {
-
- lazy private val remotePublisher = context.actorSelection(urlOfPublisher)
-
- override def preStart(): Unit = remotePublisher ! SubscribeReceiver(context.self)
-
- def receive: PartialFunction[Any, Unit] = {
- case msg => store(msg.asInstanceOf[T])
- }
-
- override def postStop(): Unit = remotePublisher ! UnsubscribeReceiver(context.self)
-
-}
-
-/**
- * A sample feeder actor
- *
- * Usage: FeederActor <hostname> <port>
- * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder would start on.
- */
-object FeederActor {
-
- def main(args: Array[String]) {
- if (args.length < 2) {
- System.err.println("Usage: FeederActor <hostname> <port>\n")
- System.exit(1)
- }
- val Seq(host, port) = args.toSeq
-
- val akkaConf = ConfigFactory.parseString(
- s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider"
- |akka.remote.enabled-transports = ["akka.remote.netty.tcp"]
- |akka.remote.netty.tcp.hostname = "$host"
- |akka.remote.netty.tcp.port = $port
- |""".stripMargin)
- val actorSystem = ActorSystem("test", akkaConf)
- val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor")
-
- println("Feeder started as:" + feeder)
-
- actorSystem.awaitTermination()
- }
-}
-
-/**
- * A sample word count program demonstrating the use of plugging in
- *
- * Actor as Receiver
- * Usage: ActorWordCount <hostname> <port>
- * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on.
- *
- * To run this example locally, you may run Feeder Actor as
- * `$ bin/run-example org.apache.spark.examples.streaming.FeederActor localhost 9999`
- * and then run the example
- * `$ bin/run-example org.apache.spark.examples.streaming.ActorWordCount localhost 9999`
- */
-object ActorWordCount {
- def main(args: Array[String]) {
- if (args.length < 2) {
- System.err.println(
- "Usage: ActorWordCount <hostname> <port>")
- System.exit(1)
- }
-
- StreamingExamples.setStreamingLogLevels()
-
- val Seq(host, port) = args.toSeq
- val sparkConf = new SparkConf().setAppName("ActorWordCount")
- // Create the context and set the batch size
- val ssc = new StreamingContext(sparkConf, Seconds(2))
-
- /*
- * Following is the use of AkkaUtils.createStream to plug in custom actor as receiver
- *
- * An important point to note:
- * Since Actor may exist outside the spark framework, It is thus user's responsibility
- * to ensure the type safety, i.e type of data received and InputDStream
- * should be same.
- *
- * For example: Both AkkaUtils.createStream and SampleActorReceiver are parameterized
- * to same type to ensure type safety.
- */
- val lines = AkkaUtils.createStream[String](
- ssc,
- Props(classOf[SampleActorReceiver[String]],
- "akka.tcp://test@%s:%s/user/FeederActor".format(host, port.toInt)),
- "SampleReceiver")
-
- // compute wordcount
- lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print()
-
- ssc.start()
- ssc.awaitTermination()
- }
-}
-// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala
deleted file mode 100644
index 91e52e4eff..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-
-import org.apache.spark.SparkConf
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.flume._
-import org.apache.spark.util.IntParam
-
-/**
- * Produces a count of events received from Flume.
- *
- * This should be used in conjunction with an AvroSink in Flume. It will start
- * an Avro server on at the request host:port address and listen for requests.
- * Your Flume AvroSink should be pointed to this address.
- *
- * Usage: FlumeEventCount <host> <port>
- * <host> is the host the Flume receiver will be started on - a receiver
- * creates a server and listens for flume events.
- * <port> is the port the Flume receiver will listen on.
- *
- * To run this example:
- * `$ bin/run-example org.apache.spark.examples.streaming.FlumeEventCount <host> <port> `
- */
-object FlumeEventCount {
- def main(args: Array[String]) {
- if (args.length < 2) {
- System.err.println(
- "Usage: FlumeEventCount <host> <port>")
- System.exit(1)
- }
-
- StreamingExamples.setStreamingLogLevels()
-
- val Array(host, IntParam(port)) = args
-
- val batchInterval = Milliseconds(2000)
-
- // Create the context and set the batch size
- val sparkConf = new SparkConf().setAppName("FlumeEventCount")
- val ssc = new StreamingContext(sparkConf, batchInterval)
-
- // Create a flume stream
- val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
-
- // Print out the count of events received from this server in each batch
- stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
-
- ssc.start()
- ssc.awaitTermination()
- }
-}
-// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala
deleted file mode 100644
index dd725d72c2..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-
-import org.apache.spark.SparkConf
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.flume._
-import org.apache.spark.util.IntParam
-
-/**
- * Produces a count of events received from Flume.
- *
- * This should be used in conjunction with the Spark Sink running in a Flume agent. See
- * the Spark Streaming programming guide for more details.
- *
- * Usage: FlumePollingEventCount <host> <port>
- * `host` is the host on which the Spark Sink is running.
- * `port` is the port at which the Spark Sink is listening.
- *
- * To run this example:
- * `$ bin/run-example org.apache.spark.examples.streaming.FlumePollingEventCount [host] [port] `
- */
-object FlumePollingEventCount {
- def main(args: Array[String]) {
- if (args.length < 2) {
- System.err.println(
- "Usage: FlumePollingEventCount <host> <port>")
- System.exit(1)
- }
-
- StreamingExamples.setStreamingLogLevels()
-
- val Array(host, IntParam(port)) = args
-
- val batchInterval = Milliseconds(2000)
-
- // Create the context and set the batch size
- val sparkConf = new SparkConf().setAppName("FlumePollingEventCount")
- val ssc = new StreamingContext(sparkConf, batchInterval)
-
- // Create a flume stream that polls the Spark Sink running in a Flume agent
- val stream = FlumeUtils.createPollingStream(ssc, host, port)
-
- // Print out the count of events received from this server in each batch
- stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
-
- ssc.start()
- ssc.awaitTermination()
- }
-}
-// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
deleted file mode 100644
index d772ae309f..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-
-import org.eclipse.paho.client.mqttv3._
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.mqtt._
-import org.apache.spark.SparkConf
-
-/**
- * A simple Mqtt publisher for demonstration purposes, repeatedly publishes
- * Space separated String Message "hello mqtt demo for spark streaming"
- */
-object MQTTPublisher {
-
- def main(args: Array[String]) {
- if (args.length < 2) {
- System.err.println("Usage: MQTTPublisher <MqttBrokerUrl> <topic>")
- System.exit(1)
- }
-
- StreamingExamples.setStreamingLogLevels()
-
- val Seq(brokerUrl, topic) = args.toSeq
-
- var client: MqttClient = null
-
- try {
- val persistence = new MemoryPersistence()
- client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)
-
- client.connect()
-
- val msgtopic = client.getTopic(topic)
- val msgContent = "hello mqtt demo for spark streaming"
- val message = new MqttMessage(msgContent.getBytes("utf-8"))
-
- while (true) {
- try {
- msgtopic.publish(message)
- println(s"Published data. topic: ${msgtopic.getName()}; Message: $message")
- } catch {
- case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT =>
- Thread.sleep(10)
- println("Queue is full, wait for to consume data from the message queue")
- }
- }
- } catch {
- case e: MqttException => println("Exception Caught: " + e)
- } finally {
- if (client != null) {
- client.disconnect()
- }
- }
- }
-}
-
-/**
- * A sample wordcount with MqttStream stream
- *
- * To work with Mqtt, Mqtt Message broker/server required.
- * Mosquitto (http://mosquitto.org/) is an open source Mqtt Broker
- * In ubuntu mosquitto can be installed using the command `$ sudo apt-get install mosquitto`
- * Eclipse paho project provides Java library for Mqtt Client http://www.eclipse.org/paho/
- * Example Java code for Mqtt Publisher and Subscriber can be found here
- * https://bitbucket.org/mkjinesh/mqttclient
- * Usage: MQTTWordCount <MqttbrokerUrl> <topic>
- * <MqttbrokerUrl> and <topic> describe where Mqtt publisher is running.
- *
- * To run this example locally, you may run publisher as
- * `$ bin/run-example \
- * org.apache.spark.examples.streaming.MQTTPublisher tcp://localhost:1883 foo`
- * and run the example as
- * `$ bin/run-example \
- * org.apache.spark.examples.streaming.MQTTWordCount tcp://localhost:1883 foo`
- */
-object MQTTWordCount {
-
- def main(args: Array[String]) {
- if (args.length < 2) {
- // scalastyle:off println
- System.err.println(
- "Usage: MQTTWordCount <MqttbrokerUrl> <topic>")
- // scalastyle:on println
- System.exit(1)
- }
-
- val Seq(brokerUrl, topic) = args.toSeq
- val sparkConf = new SparkConf().setAppName("MQTTWordCount")
- val ssc = new StreamingContext(sparkConf, Seconds(2))
- val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2)
- val words = lines.flatMap(x => x.split(" "))
- val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
-
- wordCounts.print()
- ssc.start()
- ssc.awaitTermination()
- }
-}
-// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
deleted file mode 100644
index 5af82e161a..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-
-import com.twitter.algebird._
-import com.twitter.algebird.CMSHasherImplicits._
-
-import org.apache.spark.SparkConf
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.twitter._
-
-// scalastyle:off
-/**
- * Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute
- * windowed and global Top-K estimates of user IDs occurring in a Twitter stream.
- * <br>
- * <strong>Note</strong> that since Algebird's implementation currently only supports Long inputs,
- * the example operates on Long IDs. Once the implementation supports other inputs (such as String),
- * the same approach could be used for computing popular topics for example.
- * <p>
- * <p>
- * <a href=
- * "http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
- * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a data
- * structure for approximate frequency estimation in data streams (e.g. Top-K elements, frequency
- * of any given element, etc), that uses space sub-linear in the number of elements in the
- * stream. Once elements are added to the CMS, the estimated count of an element can be computed,
- * as well as "heavy-hitters" that occur more than a threshold percentage of the overall total
- * count.
- * <p><p>
- * Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the
- * reduce operation.
- */
-// scalastyle:on
-object TwitterAlgebirdCMS {
- def main(args: Array[String]) {
- StreamingExamples.setStreamingLogLevels()
-
- // CMS parameters
- val DELTA = 1E-3
- val EPS = 0.01
- val SEED = 1
- val PERC = 0.001
- // K highest frequency elements to take
- val TOPK = 10
-
- val filters = args
- val sparkConf = new SparkConf().setAppName("TwitterAlgebirdCMS")
- val ssc = new StreamingContext(sparkConf, Seconds(10))
- val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER_2)
-
- val users = stream.map(status => status.getUser.getId)
-
- // val cms = new CountMinSketchMonoid(EPS, DELTA, SEED, PERC)
- val cms = TopPctCMS.monoid[Long](EPS, DELTA, SEED, PERC)
- var globalCMS = cms.zero
- val mm = new MapMonoid[Long, Int]()
- var globalExact = Map[Long, Int]()
-
- val approxTopUsers = users.mapPartitions(ids => {
- ids.map(id => cms.create(id))
- }).reduce(_ ++ _)
-
- val exactTopUsers = users.map(id => (id, 1))
- .reduceByKey((a, b) => a + b)
-
- approxTopUsers.foreachRDD(rdd => {
- if (rdd.count() != 0) {
- val partial = rdd.first()
- val partialTopK = partial.heavyHitters.map(id =>
- (id, partial.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
- globalCMS ++= partial
- val globalTopK = globalCMS.heavyHitters.map(id =>
- (id, globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
- println("Approx heavy hitters at %2.2f%% threshold this batch: %s".format(PERC,
- partialTopK.mkString("[", ",", "]")))
- println("Approx heavy hitters at %2.2f%% threshold overall: %s".format(PERC,
- globalTopK.mkString("[", ",", "]")))
- }
- })
-
- exactTopUsers.foreachRDD(rdd => {
- if (rdd.count() != 0) {
- val partialMap = rdd.collect().toMap
- val partialTopK = rdd.map(
- {case (id, count) => (count, id)})
- .sortByKey(ascending = false).take(TOPK)
- globalExact = mm.plus(globalExact.toMap, partialMap)
- val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0, TOPK)
- println("Exact heavy hitters this batch: %s".format(partialTopK.mkString("[", ",", "]")))
- println("Exact heavy hitters overall: %s".format(globalTopK.mkString("[", ",", "]")))
- }
- })
-
- ssc.start()
- ssc.awaitTermination()
- }
-}
-// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
deleted file mode 100644
index 6442b2a4e2..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-
-import com.twitter.algebird.HyperLogLog._
-import com.twitter.algebird.HyperLogLogMonoid
-
-import org.apache.spark.SparkConf
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.twitter._
-
-// scalastyle:off
-/**
- * Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute
- * a windowed and global estimate of the unique user IDs occurring in a Twitter stream.
- * <p>
- * <p>
- * This <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
- * blog post</a> and this
- * <a href= "http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html">
- * blog post</a>
- * have good overviews of HyperLogLog (HLL). HLL is a memory-efficient datastructure for
- * estimating the cardinality of a data stream, i.e. the number of unique elements.
- * <p><p>
- * Algebird's implementation is a monoid, so we can succinctly merge two HLL instances in the
- * reduce operation.
- */
-// scalastyle:on
-object TwitterAlgebirdHLL {
- def main(args: Array[String]) {
-
- StreamingExamples.setStreamingLogLevels()
-
- /** Bit size parameter for HyperLogLog, trades off accuracy vs size */
- val BIT_SIZE = 12
- val filters = args
- val sparkConf = new SparkConf().setAppName("TwitterAlgebirdHLL")
- val ssc = new StreamingContext(sparkConf, Seconds(5))
- val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER)
-
- val users = stream.map(status => status.getUser.getId)
-
- val hll = new HyperLogLogMonoid(BIT_SIZE)
- var globalHll = hll.zero
- var userSet: Set[Long] = Set()
-
- val approxUsers = users.mapPartitions(ids => {
- ids.map(id => hll.create(id))
- }).reduce(_ + _)
-
- val exactUsers = users.map(id => Set(id)).reduce(_ ++ _)
-
- approxUsers.foreachRDD(rdd => {
- if (rdd.count() != 0) {
- val partial = rdd.first()
- globalHll += partial
- println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt))
- println("Approx distinct users overall: %d".format(globalHll.estimatedSize.toInt))
- }
- })
-
- exactUsers.foreachRDD(rdd => {
- if (rdd.count() != 0) {
- val partial = rdd.first()
- userSet ++= partial
- println("Exact distinct users this batch: %d".format(partial.size))
- println("Exact distinct users overall: %d".format(userSet.size))
- println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1
- ) * 100))
- }
- })
-
- ssc.start()
- ssc.awaitTermination()
- }
-}
-// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterHashTagJoinSentiments.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterHashTagJoinSentiments.scala
deleted file mode 100644
index a8d392ca35..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterHashTagJoinSentiments.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-
-import org.apache.spark.SparkConf
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.twitter.TwitterUtils
-
-/**
- * Displays the most positive hash tags by joining the streaming Twitter data with a static RDD of
- * the AFINN word list (http://neuro.imm.dtu.dk/wiki/AFINN)
- */
-object TwitterHashTagJoinSentiments {
- def main(args: Array[String]) {
- if (args.length < 4) {
- System.err.println("Usage: TwitterHashTagJoinSentiments <consumer key> <consumer secret> " +
- "<access token> <access token secret> [<filters>]")
- System.exit(1)
- }
-
- StreamingExamples.setStreamingLogLevels()
-
- val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
- val filters = args.takeRight(args.length - 4)
-
- // Set the system properties so that Twitter4j library used by Twitter stream
- // can use them to generate OAuth credentials
- System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
- System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
- System.setProperty("twitter4j.oauth.accessToken", accessToken)
- System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
-
- val sparkConf = new SparkConf().setAppName("TwitterHashTagJoinSentiments")
- val ssc = new StreamingContext(sparkConf, Seconds(2))
- val stream = TwitterUtils.createStream(ssc, None, filters)
-
- val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
-
- // Read in the word-sentiment list and create a static RDD from it
- val wordSentimentFilePath = "data/streaming/AFINN-111.txt"
- val wordSentiments = ssc.sparkContext.textFile(wordSentimentFilePath).map { line =>
- val Array(word, happinessValue) = line.split("\t")
- (word, happinessValue.toInt)
- }.cache()
-
- // Determine the hash tags with the highest sentiment values by joining the streaming RDD
- // with the static RDD inside the transform() method and then multiplying
- // the frequency of the hash tag by its sentiment value
- val happiest60 = hashTags.map(hashTag => (hashTag.tail, 1))
- .reduceByKeyAndWindow(_ + _, Seconds(60))
- .transform{topicCount => wordSentiments.join(topicCount)}
- .map{case (topic, tuple) => (topic, tuple._1 * tuple._2)}
- .map{case (topic, happinessValue) => (happinessValue, topic)}
- .transform(_.sortByKey(false))
-
- val happiest10 = hashTags.map(hashTag => (hashTag.tail, 1))
- .reduceByKeyAndWindow(_ + _, Seconds(10))
- .transform{topicCount => wordSentiments.join(topicCount)}
- .map{case (topic, tuple) => (topic, tuple._1 * tuple._2)}
- .map{case (topic, happinessValue) => (happinessValue, topic)}
- .transform(_.sortByKey(false))
-
- // Print hash tags with the most positive sentiment values
- happiest60.foreachRDD(rdd => {
- val topList = rdd.take(10)
- println("\nHappiest topics in last 60 seconds (%s total):".format(rdd.count()))
- topList.foreach{case (happiness, tag) => println("%s (%s happiness)".format(tag, happiness))}
- })
-
- happiest10.foreachRDD(rdd => {
- val topList = rdd.take(10)
- println("\nHappiest topics in last 10 seconds (%s total):".format(rdd.count()))
- topList.foreach{case (happiness, tag) => println("%s (%s happiness)".format(tag, happiness))}
- })
-
- ssc.start()
- ssc.awaitTermination()
- }
-}
-// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
deleted file mode 100644
index 5b69963cc8..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.twitter._
-import org.apache.spark.SparkConf
-
-/**
- * Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter
- * stream. The stream is instantiated with credentials and optionally filters supplied by the
- * command line arguments.
- *
- * Run this on your local machine as
- *
- */
-object TwitterPopularTags {
- def main(args: Array[String]) {
- if (args.length < 4) {
- System.err.println("Usage: TwitterPopularTags <consumer key> <consumer secret> " +
- "<access token> <access token secret> [<filters>]")
- System.exit(1)
- }
-
- StreamingExamples.setStreamingLogLevels()
-
- val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
- val filters = args.takeRight(args.length - 4)
-
- // Set the system properties so that Twitter4j library used by twitter stream
- // can use them to generate OAuth credentials
- System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
- System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
- System.setProperty("twitter4j.oauth.accessToken", accessToken)
- System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
-
- val sparkConf = new SparkConf().setAppName("TwitterPopularTags")
- val ssc = new StreamingContext(sparkConf, Seconds(2))
- val stream = TwitterUtils.createStream(ssc, None, filters)
-
- val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
-
- val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
- .map{case (topic, count) => (count, topic)}
- .transform(_.sortByKey(false))
-
- val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10))
- .map{case (topic, count) => (count, topic)}
- .transform(_.sortByKey(false))
-
-
- // Print popular hashtags
- topCounts60.foreachRDD(rdd => {
- val topList = rdd.take(10)
- println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
- topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
- })
-
- topCounts10.foreachRDD(rdd => {
- val topList = rdd.take(10)
- println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
- topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
- })
-
- ssc.start()
- ssc.awaitTermination()
- }
-}
-// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
deleted file mode 100644
index 99b561750b..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-
-import scala.language.implicitConversions
-
-import akka.actor.ActorSystem
-import akka.actor.actorRef2Scala
-import akka.util.ByteString
-import akka.zeromq._
-import akka.zeromq.Subscribe
-
-import org.apache.spark.SparkConf
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.zeromq._
-
-/**
- * A simple publisher for demonstration purposes, repeatedly publishes random Messages
- * every one second.
- */
-object SimpleZeroMQPublisher {
-
- def main(args: Array[String]): Unit = {
- if (args.length < 2) {
- System.err.println("Usage: SimpleZeroMQPublisher <zeroMQUrl> <topic> ")
- System.exit(1)
- }
-
- val Seq(url, topic) = args.toSeq
- val acs: ActorSystem = ActorSystem()
-
- val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, Bind(url))
- implicit def stringToByteString(x: String): ByteString = ByteString(x)
- val messages: List[ByteString] = List("words ", "may ", "count ")
- while (true) {
- Thread.sleep(1000)
- pubSocket ! ZMQMessage(ByteString(topic) :: messages)
- }
- acs.awaitTermination()
- }
-}
-
-// scalastyle:off
-/**
- * A sample wordcount with ZeroMQStream stream
- *
- * To work with zeroMQ, some native libraries have to be installed.
- * Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide]
- * (http://www.zeromq.org/intro:get-the-software)
- *
- * Usage: ZeroMQWordCount <zeroMQurl> <topic>
- * <zeroMQurl> and <topic> describe where zeroMq publisher is running.
- *
- * To run this example locally, you may run publisher as
- * `$ bin/run-example \
- * org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.0.1:1234 foo`
- * and run the example as
- * `$ bin/run-example \
- * org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.0.1:1234 foo`
- */
-// scalastyle:on
-object ZeroMQWordCount {
- def main(args: Array[String]) {
- if (args.length < 2) {
- System.err.println("Usage: ZeroMQWordCount <zeroMQurl> <topic>")
- System.exit(1)
- }
- StreamingExamples.setStreamingLogLevels()
- val Seq(url, topic) = args.toSeq
- val sparkConf = new SparkConf().setAppName("ZeroMQWordCount")
- // Create the context and set the batch size
- val ssc = new StreamingContext(sparkConf, Seconds(2))
-
- def bytesToStringIterator(x: Seq[ByteString]): Iterator[String] = x.map(_.utf8String).iterator
-
- // For this stream, a zeroMQ publisher should be running.
- val lines = ZeroMQUtils.createStream(
- ssc,
- url,
- Subscribe(topic),
- bytesToStringIterator _)
- val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
- wordCounts.print()
- ssc.start()
- ssc.awaitTermination()
- }
-}
-// scalastyle:on println