path: root/examples
diff options
Diffstat (limited to 'examples')
13 files changed, 0 insertions, 1352 deletions
diff --git a/examples/pom.xml b/examples/pom.xml
index 3a3f547915..92bb373c73 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -67,37 +67,6 @@
- <artifactId>spark-streaming-twitter_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-flume_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-akka_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-mqtt_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-zeromq_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.spark-project.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
deleted file mode 100644
index 7884b8cdff..0000000000
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
+++ /dev/null
@@ -1,144 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.examples.streaming;
-import java.util.Arrays;
-import java.util.Iterator;
-import scala.Tuple2;
-import akka.actor.ActorSelection;
-import akka.actor.Props;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.akka.AkkaUtils;
-import org.apache.spark.streaming.akka.JavaActorReceiver;
- * A sample actor as receiver, is also simplest. This receiver actor
- * goes and subscribe to a typical publisher/feeder actor and receives
- * data.
- *
- * @see [[org.apache.spark.examples.streaming.FeederActor]]
- */
-class JavaSampleActorReceiver<T> extends JavaActorReceiver {
- private final String urlOfPublisher;
- public JavaSampleActorReceiver(String urlOfPublisher) {
- this.urlOfPublisher = urlOfPublisher;
- }
- private ActorSelection remotePublisher;
- @Override
- public void preStart() {
- remotePublisher = getContext().actorSelection(urlOfPublisher);
- remotePublisher.tell(new SubscribeReceiver(getSelf()), getSelf());
- }
- @Override
- public void onReceive(Object msg) throws Exception {
- @SuppressWarnings("unchecked")
- T msgT = (T) msg;
- store(msgT);
- }
- @Override
- public void postStop() {
- remotePublisher.tell(new UnsubscribeReceiver(getSelf()), getSelf());
- }
- * A sample word count program demonstrating the use of plugging in
- * Actor as Receiver
- * Usage: JavaActorWordCount <hostname> <port>
- * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on.
- *
- * To run this example locally, you may run Feeder Actor as
- * <code><pre>
- * $ bin/run-example org.apache.spark.examples.streaming.FeederActor localhost 9999
- * </pre></code>
- * and then run the example
- * <code><pre>
- * $ bin/run-example org.apache.spark.examples.streaming.JavaActorWordCount localhost 9999
- * </pre></code>
- */
-public class JavaActorWordCount {
- public static void main(String[] args) {
- if (args.length < 2) {
- System.err.println("Usage: JavaActorWordCount <hostname> <port>");
- System.exit(1);
- }
- StreamingExamples.setStreamingLogLevels();
- final String host = args[0];
- final String port = args[1];
- SparkConf sparkConf = new SparkConf().setAppName("JavaActorWordCount");
- // Create the context and set the batch size
- JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
- String feederActorURI = "akka.tcp://test@" + host + ":" + port + "/user/FeederActor";
- /*
- * Following is the use of AkkaUtils.createStream to plug in custom actor as receiver
- *
- * An important point to note:
- * Since Actor may exist outside the spark framework, It is thus user's responsibility
- * to ensure the type safety, i.e type of data received and InputDstream
- * should be same.
- *
- * For example: Both AkkaUtils.createStream and JavaSampleActorReceiver are parameterized
- * to same type to ensure type safety.
- */
- JavaDStream<String> lines = AkkaUtils.createStream(
- jssc,
- Props.create(JavaSampleActorReceiver.class, feederActorURI),
- "SampleReceiver");
- // compute wordcount
- lines.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> call(String s) {
- return Arrays.asList(s.split("\\s+")).iterator();
- }
- }).mapToPair(new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String s) {
- return new Tuple2<>(s, 1);
- }
- }).reduceByKey(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer i1, Integer i2) {
- return i1 + i2;
- }
- }).print();
- jssc.start();
- jssc.awaitTermination();
- }
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
deleted file mode 100644
index da56637fe8..0000000000
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
+++ /dev/null
@@ -1,75 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.examples.streaming;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.examples.streaming.StreamingExamples;
-import org.apache.spark.streaming.*;
-import org.apache.spark.streaming.api.java.*;
-import org.apache.spark.streaming.flume.FlumeUtils;
-import org.apache.spark.streaming.flume.SparkFlumeEvent;
- * Produces a count of events received from Flume.
- *
- * This should be used in conjunction with an AvroSink in Flume. It will start
- * an Avro server on at the request host:port address and listen for requests.
- * Your Flume AvroSink should be pointed to this address.
- *
- * Usage: JavaFlumeEventCount <host> <port>
- * <host> is the host the Flume receiver will be started on - a receiver
- * creates a server and listens for flume events.
- * <port> is the port the Flume receiver will listen on.
- *
- * To run this example:
- * `$ bin/run-example org.apache.spark.examples.streaming.JavaFlumeEventCount <host> <port>`
- */
-public final class JavaFlumeEventCount {
- private JavaFlumeEventCount() {
- }
- public static void main(String[] args) {
- if (args.length != 2) {
- System.err.println("Usage: JavaFlumeEventCount <host> <port>");
- System.exit(1);
- }
- StreamingExamples.setStreamingLogLevels();
- String host = args[0];
- int port = Integer.parseInt(args[1]);
- Duration batchInterval = new Duration(2000);
- SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
- JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval);
- JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port);
- flumeStream.count();
- flumeStream.count().map(new Function<Long, String>() {
- @Override
- public String call(Long in) {
- return "Received " + in + " flume events.";
- }
- }).print();
- ssc.start();
- ssc.awaitTermination();
- }
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
deleted file mode 100644
index f0ae9a99ba..0000000000
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
+++ /dev/null
@@ -1,175 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.examples.streaming;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.api.java.function.VoidFunction;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.twitter.TwitterUtils;
-import scala.Tuple2;
-import twitter4j.Status;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
- * Displays the most positive hash tags by joining the streaming Twitter data with a static RDD of
- * the AFINN word list (http://neuro.imm.dtu.dk/wiki/AFINN)
- */
-public class JavaTwitterHashTagJoinSentiments {
- public static void main(String[] args) {
- if (args.length < 4) {
- System.err.println("Usage: JavaTwitterHashTagJoinSentiments <consumer key> <consumer secret>" +
- " <access token> <access token secret> [<filters>]");
- System.exit(1);
- }
- StreamingExamples.setStreamingLogLevels();
- String consumerKey = args[0];
- String consumerSecret = args[1];
- String accessToken = args[2];
- String accessTokenSecret = args[3];
- String[] filters = Arrays.copyOfRange(args, 4, args.length);
- // Set the system properties so that Twitter4j library used by Twitter stream
- // can use them to generate OAuth credentials
- System.setProperty("twitter4j.oauth.consumerKey", consumerKey);
- System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret);
- System.setProperty("twitter4j.oauth.accessToken", accessToken);
- System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret);
- SparkConf sparkConf = new SparkConf().setAppName("JavaTwitterHashTagJoinSentiments");
- JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
- JavaReceiverInputDStream<Status> stream = TwitterUtils.createStream(jssc, filters);
- JavaDStream<String> words = stream.flatMap(new FlatMapFunction<Status, String>() {
- @Override
- public Iterator<String> call(Status s) {
- return Arrays.asList(s.getText().split(" ")).iterator();
- }
- });
- JavaDStream<String> hashTags = words.filter(new Function<String, Boolean>() {
- @Override
- public Boolean call(String word) {
- return word.startsWith("#");
- }
- });
- // Read in the word-sentiment list and create a static RDD from it
- String wordSentimentFilePath = "data/streaming/AFINN-111.txt";
- final JavaPairRDD<String, Double> wordSentiments = jssc.sparkContext().textFile(wordSentimentFilePath)
- .mapToPair(new PairFunction<String, String, Double>(){
- @Override
- public Tuple2<String, Double> call(String line) {
- String[] columns = line.split("\t");
- return new Tuple2<>(columns[0], Double.parseDouble(columns[1]));
- }
- });
- JavaPairDStream<String, Integer> hashTagCount = hashTags.mapToPair(
- new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String s) {
- // leave out the # character
- return new Tuple2<>(s.substring(1), 1);
- }
- });
- JavaPairDStream<String, Integer> hashTagTotals = hashTagCount.reduceByKeyAndWindow(
- new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer a, Integer b) {
- return a + b;
- }
- }, new Duration(10000));
- // Determine the hash tags with the highest sentiment values by joining the streaming RDD
- // with the static RDD inside the transform() method and then multiplying
- // the frequency of the hash tag by its sentiment value
- JavaPairDStream<String, Tuple2<Double, Integer>> joinedTuples =
- hashTagTotals.transformToPair(new Function<JavaPairRDD<String, Integer>,
- JavaPairRDD<String, Tuple2<Double, Integer>>>() {
- @Override
- public JavaPairRDD<String, Tuple2<Double, Integer>> call(
- JavaPairRDD<String, Integer> topicCount) {
- return wordSentiments.join(topicCount);
- }
- });
- JavaPairDStream<String, Double> topicHappiness = joinedTuples.mapToPair(
- new PairFunction<Tuple2<String, Tuple2<Double, Integer>>, String, Double>() {
- @Override
- public Tuple2<String, Double> call(Tuple2<String,
- Tuple2<Double, Integer>> topicAndTuplePair) {
- Tuple2<Double, Integer> happinessAndCount = topicAndTuplePair._2();
- return new Tuple2<>(topicAndTuplePair._1(),
- happinessAndCount._1() * happinessAndCount._2());
- }
- });
- JavaPairDStream<Double, String> happinessTopicPairs = topicHappiness.mapToPair(
- new PairFunction<Tuple2<String, Double>, Double, String>() {
- @Override
- public Tuple2<Double, String> call(Tuple2<String, Double> topicHappiness) {
- return new Tuple2<>(topicHappiness._2(),
- topicHappiness._1());
- }
- });
- JavaPairDStream<Double, String> happiest10 = happinessTopicPairs.transformToPair(
- new Function<JavaPairRDD<Double, String>, JavaPairRDD<Double, String>>() {
- @Override
- public JavaPairRDD<Double, String> call(
- JavaPairRDD<Double, String> happinessAndTopics) {
- return happinessAndTopics.sortByKey(false);
- }
- }
- );
- // Print hash tags with the most positive sentiment values
- happiest10.foreachRDD(new VoidFunction<JavaPairRDD<Double, String>>() {
- @Override
- public void call(JavaPairRDD<Double, String> happinessTopicPairs) {
- List<Tuple2<Double, String>> topList = happinessTopicPairs.take(10);
- System.out.println(
- String.format("\nHappiest topics in last 10 seconds (%s total):",
- happinessTopicPairs.count()));
- for (Tuple2<Double, String> pair : topList) {
- System.out.println(
- String.format("%s (%s happiness)", pair._2(), pair._1()));
- }
- }
- });
- jssc.start();
- jssc.awaitTermination();
- }
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
deleted file mode 100644
index 844772a289..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
+++ /dev/null
@@ -1,175 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-import scala.collection.mutable.LinkedHashSet
-import scala.util.Random
-import akka.actor._
-import com.typesafe.config.ConfigFactory
-import org.apache.spark.SparkConf
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}
-case class SubscribeReceiver(receiverActor: ActorRef)
-case class UnsubscribeReceiver(receiverActor: ActorRef)
- * Sends the random content to every receiver subscribed with 1/2
- * second delay.
- */
-class FeederActor extends Actor {
- val rand = new Random()
- val receivers = new LinkedHashSet[ActorRef]()
- val strings: Array[String] = Array("words ", "may ", "count ")
- def makeMessage(): String = {
- val x = rand.nextInt(3)
- strings(x) + strings(2 - x)
- }
- /*
- * A thread to generate random messages
- */
- new Thread() {
- override def run() {
- while (true) {
- Thread.sleep(500)
- receivers.foreach(_ ! makeMessage)
- }
- }
- }.start()
- def receive: Receive = {
- case SubscribeReceiver(receiverActor: ActorRef) =>
- println("received subscribe from %s".format(receiverActor.toString))
- receivers += receiverActor
- case UnsubscribeReceiver(receiverActor: ActorRef) =>
- println("received unsubscribe from %s".format(receiverActor.toString))
- receivers -= receiverActor
- }
- * A sample actor as receiver, is also simplest. This receiver actor
- * goes and subscribe to a typical publisher/feeder actor and receives
- * data.
- *
- * @see [[org.apache.spark.examples.streaming.FeederActor]]
- */
-class SampleActorReceiver[T](urlOfPublisher: String) extends ActorReceiver {
- lazy private val remotePublisher = context.actorSelection(urlOfPublisher)
- override def preStart(): Unit = remotePublisher ! SubscribeReceiver(context.self)
- def receive: PartialFunction[Any, Unit] = {
- case msg => store(msg.asInstanceOf[T])
- }
- override def postStop(): Unit = remotePublisher ! UnsubscribeReceiver(context.self)
- * A sample feeder actor
- *
- * Usage: FeederActor <hostname> <port>
- * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder would start on.
- */
-object FeederActor {
- def main(args: Array[String]) {
- if (args.length < 2) {
- System.err.println("Usage: FeederActor <hostname> <port>\n")
- System.exit(1)
- }
- val Seq(host, port) = args.toSeq
- val akkaConf = ConfigFactory.parseString(
- s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider"
- |akka.remote.enabled-transports = ["akka.remote.netty.tcp"]
- |akka.remote.netty.tcp.hostname = "$host"
- |akka.remote.netty.tcp.port = $port
- |""".stripMargin)
- val actorSystem = ActorSystem("test", akkaConf)
- val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor")
- println("Feeder started as:" + feeder)
- actorSystem.awaitTermination()
- }
- * A sample word count program demonstrating the use of plugging in
- *
- * Actor as Receiver
- * Usage: ActorWordCount <hostname> <port>
- * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on.
- *
- * To run this example locally, you may run Feeder Actor as
- * `$ bin/run-example org.apache.spark.examples.streaming.FeederActor localhost 9999`
- * and then run the example
- * `$ bin/run-example org.apache.spark.examples.streaming.ActorWordCount localhost 9999`
- */
-object ActorWordCount {
- def main(args: Array[String]) {
- if (args.length < 2) {
- System.err.println(
- "Usage: ActorWordCount <hostname> <port>")
- System.exit(1)
- }
- StreamingExamples.setStreamingLogLevels()
- val Seq(host, port) = args.toSeq
- val sparkConf = new SparkConf().setAppName("ActorWordCount")
- // Create the context and set the batch size
- val ssc = new StreamingContext(sparkConf, Seconds(2))
- /*
- * Following is the use of AkkaUtils.createStream to plug in custom actor as receiver
- *
- * An important point to note:
- * Since Actor may exist outside the spark framework, It is thus user's responsibility
- * to ensure the type safety, i.e type of data received and InputDStream
- * should be same.
- *
- * For example: Both AkkaUtils.createStream and SampleActorReceiver are parameterized
- * to same type to ensure type safety.
- */
- val lines = AkkaUtils.createStream[String](
- ssc,
- Props(classOf[SampleActorReceiver[String]],
- "akka.tcp://test@%s:%s/user/FeederActor".format(host, port.toInt)),
- "SampleReceiver")
- // compute wordcount
- lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print()
- ssc.start()
- ssc.awaitTermination()
- }
-// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala
deleted file mode 100644
index 91e52e4eff..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala
+++ /dev/null
@@ -1,70 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-import org.apache.spark.SparkConf
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.flume._
-import org.apache.spark.util.IntParam
- * Produces a count of events received from Flume.
- *
- * This should be used in conjunction with an AvroSink in Flume. It will start
- * an Avro server on at the request host:port address and listen for requests.
- * Your Flume AvroSink should be pointed to this address.
- *
- * Usage: FlumeEventCount <host> <port>
- * <host> is the host the Flume receiver will be started on - a receiver
- * creates a server and listens for flume events.
- * <port> is the port the Flume receiver will listen on.
- *
- * To run this example:
- * `$ bin/run-example org.apache.spark.examples.streaming.FlumeEventCount <host> <port> `
- */
-object FlumeEventCount {
- def main(args: Array[String]) {
- if (args.length < 2) {
- System.err.println(
- "Usage: FlumeEventCount <host> <port>")
- System.exit(1)
- }
- StreamingExamples.setStreamingLogLevels()
- val Array(host, IntParam(port)) = args
- val batchInterval = Milliseconds(2000)
- // Create the context and set the batch size
- val sparkConf = new SparkConf().setAppName("FlumeEventCount")
- val ssc = new StreamingContext(sparkConf, batchInterval)
- // Create a flume stream
- val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
- // Print out the count of events received from this server in each batch
- stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
- ssc.start()
- ssc.awaitTermination()
- }
-// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala
deleted file mode 100644
index dd725d72c2..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala
+++ /dev/null
@@ -1,67 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-import org.apache.spark.SparkConf
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.flume._
-import org.apache.spark.util.IntParam
- * Produces a count of events received from Flume.
- *
- * This should be used in conjunction with the Spark Sink running in a Flume agent. See
- * the Spark Streaming programming guide for more details.
- *
- * Usage: FlumePollingEventCount <host> <port>
- * `host` is the host on which the Spark Sink is running.
- * `port` is the port at which the Spark Sink is listening.
- *
- * To run this example:
- * `$ bin/run-example org.apache.spark.examples.streaming.FlumePollingEventCount [host] [port] `
- */
-object FlumePollingEventCount {
- def main(args: Array[String]) {
- if (args.length < 2) {
- System.err.println(
- "Usage: FlumePollingEventCount <host> <port>")
- System.exit(1)
- }
- StreamingExamples.setStreamingLogLevels()
- val Array(host, IntParam(port)) = args
- val batchInterval = Milliseconds(2000)
- // Create the context and set the batch size
- val sparkConf = new SparkConf().setAppName("FlumePollingEventCount")
- val ssc = new StreamingContext(sparkConf, batchInterval)
- // Create a flume stream that polls the Spark Sink running in a Flume agent
- val stream = FlumeUtils.createPollingStream(ssc, host, port)
- // Print out the count of events received from this server in each batch
- stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
- ssc.start()
- ssc.awaitTermination()
- }
-// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
deleted file mode 100644
index d772ae309f..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
+++ /dev/null
@@ -1,119 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-import org.eclipse.paho.client.mqttv3._
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.mqtt._
-import org.apache.spark.SparkConf
- * A simple Mqtt publisher for demonstration purposes, repeatedly publishes
- * Space separated String Message "hello mqtt demo for spark streaming"
- */
-object MQTTPublisher {
- def main(args: Array[String]) {
- if (args.length < 2) {
- System.err.println("Usage: MQTTPublisher <MqttBrokerUrl> <topic>")
- System.exit(1)
- }
- StreamingExamples.setStreamingLogLevels()
- val Seq(brokerUrl, topic) = args.toSeq
- var client: MqttClient = null
- try {
- val persistence = new MemoryPersistence()
- client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)
- client.connect()
- val msgtopic = client.getTopic(topic)
- val msgContent = "hello mqtt demo for spark streaming"
- val message = new MqttMessage(msgContent.getBytes("utf-8"))
- while (true) {
- try {
- msgtopic.publish(message)
- println(s"Published data. topic: ${msgtopic.getName()}; Message: $message")
- } catch {
- case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT =>
- Thread.sleep(10)
- println("Queue is full, wait for to consume data from the message queue")
- }
- }
- } catch {
- case e: MqttException => println("Exception Caught: " + e)
- } finally {
- if (client != null) {
- client.disconnect()
- }
- }
- }
- * A sample wordcount with MqttStream stream
- *
- * To work with Mqtt, Mqtt Message broker/server required.
- * Mosquitto (http://mosquitto.org/) is an open source Mqtt Broker
- * In ubuntu mosquitto can be installed using the command `$ sudo apt-get install mosquitto`
- * Eclipse paho project provides Java library for Mqtt Client http://www.eclipse.org/paho/
- * Example Java code for Mqtt Publisher and Subscriber can be found here
- * https://bitbucket.org/mkjinesh/mqttclient
- * Usage: MQTTWordCount <MqttbrokerUrl> <topic>
- * <MqttbrokerUrl> and <topic> describe where Mqtt publisher is running.
- *
- * To run this example locally, you may run publisher as
- * `$ bin/run-example \
- * org.apache.spark.examples.streaming.MQTTPublisher tcp://localhost:1883 foo`
- * and run the example as
- * `$ bin/run-example \
- * org.apache.spark.examples.streaming.MQTTWordCount tcp://localhost:1883 foo`
- */
-object MQTTWordCount {
- def main(args: Array[String]) {
- if (args.length < 2) {
- // scalastyle:off println
- System.err.println(
- "Usage: MQTTWordCount <MqttbrokerUrl> <topic>")
- // scalastyle:on println
- System.exit(1)
- }
- val Seq(brokerUrl, topic) = args.toSeq
- val sparkConf = new SparkConf().setAppName("MQTTWordCount")
- val ssc = new StreamingContext(sparkConf, Seconds(2))
- val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2)
- val words = lines.flatMap(x => x.split(" "))
- val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
- wordCounts.print()
- ssc.start()
- ssc.awaitTermination()
- }
-// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
deleted file mode 100644
index 5af82e161a..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
+++ /dev/null
@@ -1,116 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-import com.twitter.algebird._
-import com.twitter.algebird.CMSHasherImplicits._
-import org.apache.spark.SparkConf
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.twitter._
-// scalastyle:off
- * Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute
- * windowed and global Top-K estimates of user IDs occurring in a Twitter stream.
- * <br>
- * <strong>Note</strong> that since Algebird's implementation currently only supports Long inputs,
- * the example operates on Long IDs. Once the implementation supports other inputs (such as String),
- * the same approach could be used for computing popular topics for example.
- * <p>
- * <p>
- * <a href=
- * "http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
- * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a data
- * structure for approximate frequency estimation in data streams (e.g. Top-K elements, frequency
- * of any given element, etc), that uses space sub-linear in the number of elements in the
- * stream. Once elements are added to the CMS, the estimated count of an element can be computed,
- * as well as "heavy-hitters" that occur more than a threshold percentage of the overall total
- * count.
- * <p><p>
- * Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the
- * reduce operation.
- */
-// scalastyle:on
-object TwitterAlgebirdCMS {
- def main(args: Array[String]) {
- StreamingExamples.setStreamingLogLevels()
- // CMS parameters
- val DELTA = 1E-3
- val EPS = 0.01
- val SEED = 1
- val PERC = 0.001
- // K highest frequency elements to take
- val TOPK = 10
- val filters = args
- val sparkConf = new SparkConf().setAppName("TwitterAlgebirdCMS")
- val ssc = new StreamingContext(sparkConf, Seconds(10))
- val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER_2)
- val users = stream.map(status => status.getUser.getId)
- // val cms = new CountMinSketchMonoid(EPS, DELTA, SEED, PERC)
- val cms = TopPctCMS.monoid[Long](EPS, DELTA, SEED, PERC)
- var globalCMS = cms.zero
- val mm = new MapMonoid[Long, Int]()
- var globalExact = Map[Long, Int]()
- val approxTopUsers = users.mapPartitions(ids => {
- ids.map(id => cms.create(id))
- }).reduce(_ ++ _)
- val exactTopUsers = users.map(id => (id, 1))
- .reduceByKey((a, b) => a + b)
- approxTopUsers.foreachRDD(rdd => {
- if (rdd.count() != 0) {
- val partial = rdd.first()
- val partialTopK = partial.heavyHitters.map(id =>
- (id, partial.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
- globalCMS ++= partial
- val globalTopK = globalCMS.heavyHitters.map(id =>
- (id, globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
- println("Approx heavy hitters at %2.2f%% threshold this batch: %s".format(PERC,
- partialTopK.mkString("[", ",", "]")))
- println("Approx heavy hitters at %2.2f%% threshold overall: %s".format(PERC,
- globalTopK.mkString("[", ",", "]")))
- }
- })
- exactTopUsers.foreachRDD(rdd => {
- if (rdd.count() != 0) {
- val partialMap = rdd.collect().toMap
- val partialTopK = rdd.map(
- {case (id, count) => (count, id)})
- .sortByKey(ascending = false).take(TOPK)
- globalExact = mm.plus(globalExact.toMap, partialMap)
- val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0, TOPK)
- println("Exact heavy hitters this batch: %s".format(partialTopK.mkString("[", ",", "]")))
- println("Exact heavy hitters overall: %s".format(globalTopK.mkString("[", ",", "]")))
- }
- })
- ssc.start()
- ssc.awaitTermination()
- }
-// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
deleted file mode 100644
index 6442b2a4e2..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
+++ /dev/null
@@ -1,94 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-import com.twitter.algebird.HyperLogLog._
-import com.twitter.algebird.HyperLogLogMonoid
-import org.apache.spark.SparkConf
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.twitter._
-// scalastyle:off
- * Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute
- * a windowed and global estimate of the unique user IDs occurring in a Twitter stream.
- * <p>
- * <p>
- * This <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
- * blog post</a> and this
- * <a href= "http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html">
- * blog post</a>
- * have good overviews of HyperLogLog (HLL). HLL is a memory-efficient datastructure for
- * estimating the cardinality of a data stream, i.e. the number of unique elements.
- * <p><p>
- * Algebird's implementation is a monoid, so we can succinctly merge two HLL instances in the
- * reduce operation.
- */
-// scalastyle:on
-object TwitterAlgebirdHLL {
- def main(args: Array[String]) {
- StreamingExamples.setStreamingLogLevels()
- /** Bit size parameter for HyperLogLog, trades off accuracy vs size */
- val BIT_SIZE = 12
- val filters = args
- val sparkConf = new SparkConf().setAppName("TwitterAlgebirdHLL")
- val ssc = new StreamingContext(sparkConf, Seconds(5))
- val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER)
- val users = stream.map(status => status.getUser.getId)
- val hll = new HyperLogLogMonoid(BIT_SIZE)
- var globalHll = hll.zero
- var userSet: Set[Long] = Set()
- val approxUsers = users.mapPartitions(ids => {
- ids.map(id => hll.create(id))
- }).reduce(_ + _)
- val exactUsers = users.map(id => Set(id)).reduce(_ ++ _)
- approxUsers.foreachRDD(rdd => {
- if (rdd.count() != 0) {
- val partial = rdd.first()
- globalHll += partial
- println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt))
- println("Approx distinct users overall: %d".format(globalHll.estimatedSize.toInt))
- }
- })
- exactUsers.foreachRDD(rdd => {
- if (rdd.count() != 0) {
- val partial = rdd.first()
- userSet ++= partial
- println("Exact distinct users this batch: %d".format(partial.size))
- println("Exact distinct users overall: %d".format(userSet.size))
- println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1
- ) * 100))
- }
- })
- ssc.start()
- ssc.awaitTermination()
- }
-// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterHashTagJoinSentiments.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterHashTagJoinSentiments.scala
deleted file mode 100644
index a8d392ca35..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterHashTagJoinSentiments.scala
+++ /dev/null
@@ -1,96 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-import org.apache.spark.SparkConf
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.twitter.TwitterUtils
- * Displays the most positive hash tags by joining the streaming Twitter data with a static RDD of
- * the AFINN word list (http://neuro.imm.dtu.dk/wiki/AFINN)
- */
-object TwitterHashTagJoinSentiments {
- def main(args: Array[String]) {
- if (args.length < 4) {
- System.err.println("Usage: TwitterHashTagJoinSentiments <consumer key> <consumer secret> " +
- "<access token> <access token secret> [<filters>]")
- System.exit(1)
- }
- StreamingExamples.setStreamingLogLevels()
- val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
- val filters = args.takeRight(args.length - 4)
- // Set the system properties so that Twitter4j library used by Twitter stream
- // can use them to generate OAuth credentials
- System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
- System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
- System.setProperty("twitter4j.oauth.accessToken", accessToken)
- System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
- val sparkConf = new SparkConf().setAppName("TwitterHashTagJoinSentiments")
- val ssc = new StreamingContext(sparkConf, Seconds(2))
- val stream = TwitterUtils.createStream(ssc, None, filters)
- val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
- // Read in the word-sentiment list and create a static RDD from it
- val wordSentimentFilePath = "data/streaming/AFINN-111.txt"
- val wordSentiments = ssc.sparkContext.textFile(wordSentimentFilePath).map { line =>
- val Array(word, happinessValue) = line.split("\t")
- (word, happinessValue.toInt)
- }.cache()
- // Determine the hash tags with the highest sentiment values by joining the streaming RDD
- // with the static RDD inside the transform() method and then multiplying
- // the frequency of the hash tag by its sentiment value
- val happiest60 = hashTags.map(hashTag => (hashTag.tail, 1))
- .reduceByKeyAndWindow(_ + _, Seconds(60))
- .transform{topicCount => wordSentiments.join(topicCount)}
- .map{case (topic, tuple) => (topic, tuple._1 * tuple._2)}
- .map{case (topic, happinessValue) => (happinessValue, topic)}
- .transform(_.sortByKey(false))
- val happiest10 = hashTags.map(hashTag => (hashTag.tail, 1))
- .reduceByKeyAndWindow(_ + _, Seconds(10))
- .transform{topicCount => wordSentiments.join(topicCount)}
- .map{case (topic, tuple) => (topic, tuple._1 * tuple._2)}
- .map{case (topic, happinessValue) => (happinessValue, topic)}
- .transform(_.sortByKey(false))
- // Print hash tags with the most positive sentiment values
- happiest60.foreachRDD(rdd => {
- val topList = rdd.take(10)
- println("\nHappiest topics in last 60 seconds (%s total):".format(rdd.count()))
- topList.foreach{case (happiness, tag) => println("%s (%s happiness)".format(tag, happiness))}
- })
- happiest10.foreachRDD(rdd => {
- val topList = rdd.take(10)
- println("\nHappiest topics in last 10 seconds (%s total):".format(rdd.count()))
- topList.foreach{case (happiness, tag) => println("%s (%s happiness)".format(tag, happiness))}
- })
- ssc.start()
- ssc.awaitTermination()
- }
-// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
deleted file mode 100644
index 5b69963cc8..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
+++ /dev/null
@@ -1,85 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.twitter._
-import org.apache.spark.SparkConf
- * Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter
- * stream. The stream is instantiated with credentials and optionally filters supplied by the
- * command line arguments.
- *
- * Run this on your local machine as
- *
- */
-object TwitterPopularTags {
- def main(args: Array[String]) {
- if (args.length < 4) {
- System.err.println("Usage: TwitterPopularTags <consumer key> <consumer secret> " +
- "<access token> <access token secret> [<filters>]")
- System.exit(1)
- }
- StreamingExamples.setStreamingLogLevels()
- val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
- val filters = args.takeRight(args.length - 4)
- // Set the system properties so that Twitter4j library used by twitter stream
- // can use them to generate OAuth credentials
- System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
- System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
- System.setProperty("twitter4j.oauth.accessToken", accessToken)
- System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
- val sparkConf = new SparkConf().setAppName("TwitterPopularTags")
- val ssc = new StreamingContext(sparkConf, Seconds(2))
- val stream = TwitterUtils.createStream(ssc, None, filters)
- val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
- val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
- .map{case (topic, count) => (count, topic)}
- .transform(_.sortByKey(false))
- val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10))
- .map{case (topic, count) => (count, topic)}
- .transform(_.sortByKey(false))
- // Print popular hashtags
- topCounts60.foreachRDD(rdd => {
- val topList = rdd.take(10)
- println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
- topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
- })
- topCounts10.foreachRDD(rdd => {
- val topList = rdd.take(10)
- println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
- topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
- })
- ssc.start()
- ssc.awaitTermination()
- }
-// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
deleted file mode 100644
index 99b561750b..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
+++ /dev/null
@@ -1,105 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-import scala.language.implicitConversions
-import akka.actor.ActorSystem
-import akka.actor.actorRef2Scala
-import akka.util.ByteString
-import akka.zeromq._
-import akka.zeromq.Subscribe
-import org.apache.spark.SparkConf
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.zeromq._
- * A simple publisher for demonstration purposes, repeatedly publishes random Messages
- * every one second.
- */
-object SimpleZeroMQPublisher {
- def main(args: Array[String]): Unit = {
- if (args.length < 2) {
- System.err.println("Usage: SimpleZeroMQPublisher <zeroMQUrl> <topic> ")
- System.exit(1)
- }
- val Seq(url, topic) = args.toSeq
- val acs: ActorSystem = ActorSystem()
- val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, Bind(url))
- implicit def stringToByteString(x: String): ByteString = ByteString(x)
- val messages: List[ByteString] = List("words ", "may ", "count ")
- while (true) {
- Thread.sleep(1000)
- pubSocket ! ZMQMessage(ByteString(topic) :: messages)
- }
- acs.awaitTermination()
- }
-// scalastyle:off
- * A sample wordcount with ZeroMQStream stream
- *
- * To work with zeroMQ, some native libraries have to be installed.
- * Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide]
- * (http://www.zeromq.org/intro:get-the-software)
- *
- * Usage: ZeroMQWordCount <zeroMQurl> <topic>
- * <zeroMQurl> and <topic> describe where zeroMq publisher is running.
- *
- * To run this example locally, you may run publisher as
- * `$ bin/run-example \
- * org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp:// foo`
- * and run the example as
- * `$ bin/run-example \
- * org.apache.spark.examples.streaming.ZeroMQWordCount tcp:// foo`
- */
-// scalastyle:on
-object ZeroMQWordCount {
- def main(args: Array[String]) {
- if (args.length < 2) {
- System.err.println("Usage: ZeroMQWordCount <zeroMQurl> <topic>")
- System.exit(1)
- }
- StreamingExamples.setStreamingLogLevels()
- val Seq(url, topic) = args.toSeq
- val sparkConf = new SparkConf().setAppName("ZeroMQWordCount")
- // Create the context and set the batch size
- val ssc = new StreamingContext(sparkConf, Seconds(2))
- def bytesToStringIterator(x: Seq[ByteString]): Iterator[String] = x.map(_.utf8String).iterator
- // For this stream, a zeroMQ publisher should be running.
- val lines = ZeroMQUtils.createStream(
- ssc,
- url,
- Subscribe(topic),
- bytesToStringIterator _)
- val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
- wordCounts.print()
- ssc.start()
- ssc.awaitTermination()
- }
-// scalastyle:on println