diff options
Diffstat (limited to 'examples')
13 files changed, 0 insertions, 1352 deletions
diff --git a/examples/pom.xml b/examples/pom.xml index 3a3f547915..92bb373c73 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -67,37 +67,6 @@ </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming-twitter_${scala.binary.version}</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming-flume_${scala.binary.version}</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming-akka_${scala.binary.version}</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming-mqtt_${scala.binary.version}</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming-zeromq_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>org.spark-project.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java deleted file mode 100644 index 7884b8cdff..0000000000 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.examples.streaming; - -import java.util.Arrays; -import java.util.Iterator; - -import scala.Tuple2; - -import akka.actor.ActorSelection; -import akka.actor.Props; - -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.streaming.Duration; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.akka.AkkaUtils; -import org.apache.spark.streaming.akka.JavaActorReceiver; - -/** - * A sample actor as receiver, is also simplest. This receiver actor - * goes and subscribe to a typical publisher/feeder actor and receives - * data. - * - * @see [[org.apache.spark.examples.streaming.FeederActor]] - */ -class JavaSampleActorReceiver<T> extends JavaActorReceiver { - - private final String urlOfPublisher; - - public JavaSampleActorReceiver(String urlOfPublisher) { - this.urlOfPublisher = urlOfPublisher; - } - - private ActorSelection remotePublisher; - - @Override - public void preStart() { - remotePublisher = getContext().actorSelection(urlOfPublisher); - remotePublisher.tell(new SubscribeReceiver(getSelf()), getSelf()); - } - - @Override - public void onReceive(Object msg) throws Exception { - @SuppressWarnings("unchecked") - T msgT = (T) msg; - store(msgT); - } - - @Override - public void postStop() { - remotePublisher.tell(new UnsubscribeReceiver(getSelf()), getSelf()); - } -} - -/** - * A sample word count program demonstrating the use of plugging in - * Actor as Receiver - * Usage: JavaActorWordCount <hostname> <port> - * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on. - * - * To run this example locally, you may run Feeder Actor as - * <code><pre> - * $ bin/run-example org.apache.spark.examples.streaming.FeederActor localhost 9999 - * </pre></code> - * and then run the example - * <code><pre> - * $ bin/run-example org.apache.spark.examples.streaming.JavaActorWordCount localhost 9999 - * </pre></code> - */ -public class JavaActorWordCount { - - public static void main(String[] args) { - if (args.length < 2) { - System.err.println("Usage: JavaActorWordCount <hostname> <port>"); - System.exit(1); - } - - StreamingExamples.setStreamingLogLevels(); - - final String host = args[0]; - final String port = args[1]; - SparkConf sparkConf = new SparkConf().setAppName("JavaActorWordCount"); - // Create the context and set the batch size - JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); - - String feederActorURI = "akka.tcp://test@" + host + ":" + port + "/user/FeederActor"; - - /* - * Following is the use of AkkaUtils.createStream to plug in custom actor as receiver - * - * An important point to note: - * Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e type of data received and InputDstream - * should be same. - * - * For example: Both AkkaUtils.createStream and JavaSampleActorReceiver are parameterized - * to same type to ensure type safety. - */ - JavaDStream<String> lines = AkkaUtils.createStream( - jssc, - Props.create(JavaSampleActorReceiver.class, feederActorURI), - "SampleReceiver"); - - // compute wordcount - lines.flatMap(new FlatMapFunction<String, String>() { - @Override - public Iterator<String> call(String s) { - return Arrays.asList(s.split("\\s+")).iterator(); - } - }).mapToPair(new PairFunction<String, String, Integer>() { - @Override - public Tuple2<String, Integer> call(String s) { - return new Tuple2<>(s, 1); - } - }).reduceByKey(new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - }).print(); - - jssc.start(); - jssc.awaitTermination(); - } -} diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java deleted file mode 100644 index da56637fe8..0000000000 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.examples.streaming; - -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.examples.streaming.StreamingExamples; -import org.apache.spark.streaming.*; -import org.apache.spark.streaming.api.java.*; -import org.apache.spark.streaming.flume.FlumeUtils; -import org.apache.spark.streaming.flume.SparkFlumeEvent; - -/** - * Produces a count of events received from Flume. - * - * This should be used in conjunction with an AvroSink in Flume. It will start - * an Avro server on at the request host:port address and listen for requests. - * Your Flume AvroSink should be pointed to this address. - * - * Usage: JavaFlumeEventCount <host> <port> - * <host> is the host the Flume receiver will be started on - a receiver - * creates a server and listens for flume events. - * <port> is the port the Flume receiver will listen on. - * - * To run this example: - * `$ bin/run-example org.apache.spark.examples.streaming.JavaFlumeEventCount <host> <port>` - */ -public final class JavaFlumeEventCount { - private JavaFlumeEventCount() { - } - - public static void main(String[] args) { - if (args.length != 2) { - System.err.println("Usage: JavaFlumeEventCount <host> <port>"); - System.exit(1); - } - - StreamingExamples.setStreamingLogLevels(); - - String host = args[0]; - int port = Integer.parseInt(args[1]); - - Duration batchInterval = new Duration(2000); - SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount"); - JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval); - JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port); - - flumeStream.count(); - - flumeStream.count().map(new Function<Long, String>() { - @Override - public String call(Long in) { - return "Received " + in + " flume events."; - } - }).print(); - - ssc.start(); - ssc.awaitTermination(); - } -} diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java deleted file mode 100644 index f0ae9a99ba..0000000000 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.examples.streaming; - -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.api.java.function.VoidFunction; -import org.apache.spark.streaming.Duration; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.JavaPairDStream; -import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; -import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.twitter.TwitterUtils; -import scala.Tuple2; -import twitter4j.Status; - -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; - -/** - * Displays the most positive hash tags by joining the streaming Twitter data with a static RDD of - * the AFINN word list (http://neuro.imm.dtu.dk/wiki/AFINN) - */ -public class JavaTwitterHashTagJoinSentiments { - - public static void main(String[] args) { - if (args.length < 4) { - System.err.println("Usage: JavaTwitterHashTagJoinSentiments <consumer key> <consumer secret>" + - " <access token> <access token secret> [<filters>]"); - System.exit(1); - } - - StreamingExamples.setStreamingLogLevels(); - - String consumerKey = args[0]; - String consumerSecret = args[1]; - String accessToken = args[2]; - String accessTokenSecret = args[3]; - String[] filters = Arrays.copyOfRange(args, 4, args.length); - - // Set the system properties so that Twitter4j library used by Twitter stream - // can use them to generate OAuth credentials - System.setProperty("twitter4j.oauth.consumerKey", consumerKey); - System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret); - System.setProperty("twitter4j.oauth.accessToken", accessToken); - System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret); - - SparkConf sparkConf = new SparkConf().setAppName("JavaTwitterHashTagJoinSentiments"); - JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); - JavaReceiverInputDStream<Status> stream = TwitterUtils.createStream(jssc, filters); - - JavaDStream<String> words = stream.flatMap(new FlatMapFunction<Status, String>() { - @Override - public Iterator<String> call(Status s) { - return Arrays.asList(s.getText().split(" ")).iterator(); - } - }); - - JavaDStream<String> hashTags = words.filter(new Function<String, Boolean>() { - @Override - public Boolean call(String word) { - return word.startsWith("#"); - } - }); - - // Read in the word-sentiment list and create a static RDD from it - String wordSentimentFilePath = "data/streaming/AFINN-111.txt"; - final JavaPairRDD<String, Double> wordSentiments = jssc.sparkContext().textFile(wordSentimentFilePath) - .mapToPair(new PairFunction<String, String, Double>(){ - @Override - public Tuple2<String, Double> call(String line) { - String[] columns = line.split("\t"); - return new Tuple2<>(columns[0], Double.parseDouble(columns[1])); - } - }); - - JavaPairDStream<String, Integer> hashTagCount = hashTags.mapToPair( - new PairFunction<String, String, Integer>() { - @Override - public Tuple2<String, Integer> call(String s) { - // leave out the # character - return new Tuple2<>(s.substring(1), 1); - } - }); - - JavaPairDStream<String, Integer> hashTagTotals = hashTagCount.reduceByKeyAndWindow( - new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer a, Integer b) { - return a + b; - } - }, new Duration(10000)); - - // Determine the hash tags with the highest sentiment values by joining the streaming RDD - // with the static RDD inside the transform() method and then multiplying - // the frequency of the hash tag by its sentiment value - JavaPairDStream<String, Tuple2<Double, Integer>> joinedTuples = - hashTagTotals.transformToPair(new Function<JavaPairRDD<String, Integer>, - JavaPairRDD<String, Tuple2<Double, Integer>>>() { - @Override - public JavaPairRDD<String, Tuple2<Double, Integer>> call( - JavaPairRDD<String, Integer> topicCount) { - return wordSentiments.join(topicCount); - } - }); - - JavaPairDStream<String, Double> topicHappiness = joinedTuples.mapToPair( - new PairFunction<Tuple2<String, Tuple2<Double, Integer>>, String, Double>() { - @Override - public Tuple2<String, Double> call(Tuple2<String, - Tuple2<Double, Integer>> topicAndTuplePair) { - Tuple2<Double, Integer> happinessAndCount = topicAndTuplePair._2(); - return new Tuple2<>(topicAndTuplePair._1(), - happinessAndCount._1() * happinessAndCount._2()); - } - }); - - JavaPairDStream<Double, String> happinessTopicPairs = topicHappiness.mapToPair( - new PairFunction<Tuple2<String, Double>, Double, String>() { - @Override - public Tuple2<Double, String> call(Tuple2<String, Double> topicHappiness) { - return new Tuple2<>(topicHappiness._2(), - topicHappiness._1()); - } - }); - - JavaPairDStream<Double, String> happiest10 = happinessTopicPairs.transformToPair( - new Function<JavaPairRDD<Double, String>, JavaPairRDD<Double, String>>() { - @Override - public JavaPairRDD<Double, String> call( - JavaPairRDD<Double, String> happinessAndTopics) { - return happinessAndTopics.sortByKey(false); - } - } - ); - - // Print hash tags with the most positive sentiment values - happiest10.foreachRDD(new VoidFunction<JavaPairRDD<Double, String>>() { - @Override - public void call(JavaPairRDD<Double, String> happinessTopicPairs) { - List<Tuple2<Double, String>> topList = happinessTopicPairs.take(10); - System.out.println( - String.format("\nHappiest topics in last 10 seconds (%s total):", - happinessTopicPairs.count())); - for (Tuple2<Double, String> pair : topList) { - System.out.println( - String.format("%s (%s happiness)", pair._2(), pair._1())); - } - } - }); - - jssc.start(); - jssc.awaitTermination(); - } -} diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala deleted file mode 100644 index 844772a289..0000000000 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// scalastyle:off println -package org.apache.spark.examples.streaming - -import scala.collection.mutable.LinkedHashSet -import scala.util.Random - -import akka.actor._ -import com.typesafe.config.ConfigFactory - -import org.apache.spark.SparkConf -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils} - -case class SubscribeReceiver(receiverActor: ActorRef) -case class UnsubscribeReceiver(receiverActor: ActorRef) - -/** - * Sends the random content to every receiver subscribed with 1/2 - * second delay. - */ -class FeederActor extends Actor { - - val rand = new Random() - val receivers = new LinkedHashSet[ActorRef]() - - val strings: Array[String] = Array("words ", "may ", "count ") - - def makeMessage(): String = { - val x = rand.nextInt(3) - strings(x) + strings(2 - x) - } - - /* - * A thread to generate random messages - */ - new Thread() { - override def run() { - while (true) { - Thread.sleep(500) - receivers.foreach(_ ! makeMessage) - } - } - }.start() - - def receive: Receive = { - case SubscribeReceiver(receiverActor: ActorRef) => - println("received subscribe from %s".format(receiverActor.toString)) - receivers += receiverActor - - case UnsubscribeReceiver(receiverActor: ActorRef) => - println("received unsubscribe from %s".format(receiverActor.toString)) - receivers -= receiverActor - } -} - -/** - * A sample actor as receiver, is also simplest. This receiver actor - * goes and subscribe to a typical publisher/feeder actor and receives - * data. - * - * @see [[org.apache.spark.examples.streaming.FeederActor]] - */ -class SampleActorReceiver[T](urlOfPublisher: String) extends ActorReceiver { - - lazy private val remotePublisher = context.actorSelection(urlOfPublisher) - - override def preStart(): Unit = remotePublisher ! SubscribeReceiver(context.self) - - def receive: PartialFunction[Any, Unit] = { - case msg => store(msg.asInstanceOf[T]) - } - - override def postStop(): Unit = remotePublisher ! UnsubscribeReceiver(context.self) - -} - -/** - * A sample feeder actor - * - * Usage: FeederActor <hostname> <port> - * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder would start on. - */ -object FeederActor { - - def main(args: Array[String]) { - if (args.length < 2) { - System.err.println("Usage: FeederActor <hostname> <port>\n") - System.exit(1) - } - val Seq(host, port) = args.toSeq - - val akkaConf = ConfigFactory.parseString( - s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider" - |akka.remote.enabled-transports = ["akka.remote.netty.tcp"] - |akka.remote.netty.tcp.hostname = "$host" - |akka.remote.netty.tcp.port = $port - |""".stripMargin) - val actorSystem = ActorSystem("test", akkaConf) - val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor") - - println("Feeder started as:" + feeder) - - actorSystem.awaitTermination() - } -} - -/** - * A sample word count program demonstrating the use of plugging in - * - * Actor as Receiver - * Usage: ActorWordCount <hostname> <port> - * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on. - * - * To run this example locally, you may run Feeder Actor as - * `$ bin/run-example org.apache.spark.examples.streaming.FeederActor localhost 9999` - * and then run the example - * `$ bin/run-example org.apache.spark.examples.streaming.ActorWordCount localhost 9999` - */ -object ActorWordCount { - def main(args: Array[String]) { - if (args.length < 2) { - System.err.println( - "Usage: ActorWordCount <hostname> <port>") - System.exit(1) - } - - StreamingExamples.setStreamingLogLevels() - - val Seq(host, port) = args.toSeq - val sparkConf = new SparkConf().setAppName("ActorWordCount") - // Create the context and set the batch size - val ssc = new StreamingContext(sparkConf, Seconds(2)) - - /* - * Following is the use of AkkaUtils.createStream to plug in custom actor as receiver - * - * An important point to note: - * Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e type of data received and InputDStream - * should be same. - * - * For example: Both AkkaUtils.createStream and SampleActorReceiver are parameterized - * to same type to ensure type safety. - */ - val lines = AkkaUtils.createStream[String]( - ssc, - Props(classOf[SampleActorReceiver[String]], - "akka.tcp://test@%s:%s/user/FeederActor".format(host, port.toInt)), - "SampleReceiver") - - // compute wordcount - lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print() - - ssc.start() - ssc.awaitTermination() - } -} -// scalastyle:on println diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala deleted file mode 100644 index 91e52e4eff..0000000000 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// scalastyle:off println -package org.apache.spark.examples.streaming - -import org.apache.spark.SparkConf -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming._ -import org.apache.spark.streaming.flume._ -import org.apache.spark.util.IntParam - -/** - * Produces a count of events received from Flume. - * - * This should be used in conjunction with an AvroSink in Flume. It will start - * an Avro server on at the request host:port address and listen for requests. - * Your Flume AvroSink should be pointed to this address. - * - * Usage: FlumeEventCount <host> <port> - * <host> is the host the Flume receiver will be started on - a receiver - * creates a server and listens for flume events. - * <port> is the port the Flume receiver will listen on. - * - * To run this example: - * `$ bin/run-example org.apache.spark.examples.streaming.FlumeEventCount <host> <port> ` - */ -object FlumeEventCount { - def main(args: Array[String]) { - if (args.length < 2) { - System.err.println( - "Usage: FlumeEventCount <host> <port>") - System.exit(1) - } - - StreamingExamples.setStreamingLogLevels() - - val Array(host, IntParam(port)) = args - - val batchInterval = Milliseconds(2000) - - // Create the context and set the batch size - val sparkConf = new SparkConf().setAppName("FlumeEventCount") - val ssc = new StreamingContext(sparkConf, batchInterval) - - // Create a flume stream - val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2) - - // Print out the count of events received from this server in each batch - stream.count().map(cnt => "Received " + cnt + " flume events." ).print() - - ssc.start() - ssc.awaitTermination() - } -} -// scalastyle:on println diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala deleted file mode 100644 index dd725d72c2..0000000000 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// scalastyle:off println -package org.apache.spark.examples.streaming - -import org.apache.spark.SparkConf -import org.apache.spark.streaming._ -import org.apache.spark.streaming.flume._ -import org.apache.spark.util.IntParam - -/** - * Produces a count of events received from Flume. - * - * This should be used in conjunction with the Spark Sink running in a Flume agent. See - * the Spark Streaming programming guide for more details. - * - * Usage: FlumePollingEventCount <host> <port> - * `host` is the host on which the Spark Sink is running. - * `port` is the port at which the Spark Sink is listening. - * - * To run this example: - * `$ bin/run-example org.apache.spark.examples.streaming.FlumePollingEventCount [host] [port] ` - */ -object FlumePollingEventCount { - def main(args: Array[String]) { - if (args.length < 2) { - System.err.println( - "Usage: FlumePollingEventCount <host> <port>") - System.exit(1) - } - - StreamingExamples.setStreamingLogLevels() - - val Array(host, IntParam(port)) = args - - val batchInterval = Milliseconds(2000) - - // Create the context and set the batch size - val sparkConf = new SparkConf().setAppName("FlumePollingEventCount") - val ssc = new StreamingContext(sparkConf, batchInterval) - - // Create a flume stream that polls the Spark Sink running in a Flume agent - val stream = FlumeUtils.createPollingStream(ssc, host, port) - - // Print out the count of events received from this server in each batch - stream.count().map(cnt => "Received " + cnt + " flume events." ).print() - - ssc.start() - ssc.awaitTermination() - } -} -// scalastyle:on println diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala deleted file mode 100644 index d772ae309f..0000000000 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// scalastyle:off println -package org.apache.spark.examples.streaming - -import org.eclipse.paho.client.mqttv3._ -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence - -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.streaming.mqtt._ -import org.apache.spark.SparkConf - -/** - * A simple Mqtt publisher for demonstration purposes, repeatedly publishes - * Space separated String Message "hello mqtt demo for spark streaming" - */ -object MQTTPublisher { - - def main(args: Array[String]) { - if (args.length < 2) { - System.err.println("Usage: MQTTPublisher <MqttBrokerUrl> <topic>") - System.exit(1) - } - - StreamingExamples.setStreamingLogLevels() - - val Seq(brokerUrl, topic) = args.toSeq - - var client: MqttClient = null - - try { - val persistence = new MemoryPersistence() - client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence) - - client.connect() - - val msgtopic = client.getTopic(topic) - val msgContent = "hello mqtt demo for spark streaming" - val message = new MqttMessage(msgContent.getBytes("utf-8")) - - while (true) { - try { - msgtopic.publish(message) - println(s"Published data. topic: ${msgtopic.getName()}; Message: $message") - } catch { - case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT => - Thread.sleep(10) - println("Queue is full, wait for to consume data from the message queue") - } - } - } catch { - case e: MqttException => println("Exception Caught: " + e) - } finally { - if (client != null) { - client.disconnect() - } - } - } -} - -/** - * A sample wordcount with MqttStream stream - * - * To work with Mqtt, Mqtt Message broker/server required. - * Mosquitto (http://mosquitto.org/) is an open source Mqtt Broker - * In ubuntu mosquitto can be installed using the command `$ sudo apt-get install mosquitto` - * Eclipse paho project provides Java library for Mqtt Client http://www.eclipse.org/paho/ - * Example Java code for Mqtt Publisher and Subscriber can be found here - * https://bitbucket.org/mkjinesh/mqttclient - * Usage: MQTTWordCount <MqttbrokerUrl> <topic> - * <MqttbrokerUrl> and <topic> describe where Mqtt publisher is running. - * - * To run this example locally, you may run publisher as - * `$ bin/run-example \ - * org.apache.spark.examples.streaming.MQTTPublisher tcp://localhost:1883 foo` - * and run the example as - * `$ bin/run-example \ - * org.apache.spark.examples.streaming.MQTTWordCount tcp://localhost:1883 foo` - */ -object MQTTWordCount { - - def main(args: Array[String]) { - if (args.length < 2) { - // scalastyle:off println - System.err.println( - "Usage: MQTTWordCount <MqttbrokerUrl> <topic>") - // scalastyle:on println - System.exit(1) - } - - val Seq(brokerUrl, topic) = args.toSeq - val sparkConf = new SparkConf().setAppName("MQTTWordCount") - val ssc = new StreamingContext(sparkConf, Seconds(2)) - val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2) - val words = lines.flatMap(x => x.split(" ")) - val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) - - wordCounts.print() - ssc.start() - ssc.awaitTermination() - } -} -// scalastyle:on println diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala deleted file mode 100644 index 5af82e161a..0000000000 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// scalastyle:off println -package org.apache.spark.examples.streaming - -import com.twitter.algebird._ -import com.twitter.algebird.CMSHasherImplicits._ - -import org.apache.spark.SparkConf -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.streaming.twitter._ - -// scalastyle:off -/** - * Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute - * windowed and global Top-K estimates of user IDs occurring in a Twitter stream. - * <br> - * <strong>Note</strong> that since Algebird's implementation currently only supports Long inputs, - * the example operates on Long IDs. Once the implementation supports other inputs (such as String), - * the same approach could be used for computing popular topics for example. - * <p> - * <p> - * <a href= - * "http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/"> - * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a data - * structure for approximate frequency estimation in data streams (e.g. Top-K elements, frequency - * of any given element, etc), that uses space sub-linear in the number of elements in the - * stream. Once elements are added to the CMS, the estimated count of an element can be computed, - * as well as "heavy-hitters" that occur more than a threshold percentage of the overall total - * count. - * <p><p> - * Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the - * reduce operation. - */ -// scalastyle:on -object TwitterAlgebirdCMS { - def main(args: Array[String]) { - StreamingExamples.setStreamingLogLevels() - - // CMS parameters - val DELTA = 1E-3 - val EPS = 0.01 - val SEED = 1 - val PERC = 0.001 - // K highest frequency elements to take - val TOPK = 10 - - val filters = args - val sparkConf = new SparkConf().setAppName("TwitterAlgebirdCMS") - val ssc = new StreamingContext(sparkConf, Seconds(10)) - val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER_2) - - val users = stream.map(status => status.getUser.getId) - - // val cms = new CountMinSketchMonoid(EPS, DELTA, SEED, PERC) - val cms = TopPctCMS.monoid[Long](EPS, DELTA, SEED, PERC) - var globalCMS = cms.zero - val mm = new MapMonoid[Long, Int]() - var globalExact = Map[Long, Int]() - - val approxTopUsers = users.mapPartitions(ids => { - ids.map(id => cms.create(id)) - }).reduce(_ ++ _) - - val exactTopUsers = users.map(id => (id, 1)) - .reduceByKey((a, b) => a + b) - - approxTopUsers.foreachRDD(rdd => { - if (rdd.count() != 0) { - val partial = rdd.first() - val partialTopK = partial.heavyHitters.map(id => - (id, partial.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK) - globalCMS ++= partial - val globalTopK = globalCMS.heavyHitters.map(id => - (id, globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK) - println("Approx heavy hitters at %2.2f%% threshold this batch: %s".format(PERC, - partialTopK.mkString("[", ",", "]"))) - println("Approx heavy hitters at %2.2f%% threshold overall: %s".format(PERC, - globalTopK.mkString("[", ",", "]"))) - } - }) - - exactTopUsers.foreachRDD(rdd => { - if (rdd.count() != 0) { - val partialMap = rdd.collect().toMap - val partialTopK = rdd.map( - {case (id, count) => (count, id)}) - .sortByKey(ascending = false).take(TOPK) - globalExact = mm.plus(globalExact.toMap, partialMap) - val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0, TOPK) - println("Exact heavy hitters this batch: %s".format(partialTopK.mkString("[", ",", "]"))) - println("Exact heavy hitters overall: %s".format(globalTopK.mkString("[", ",", "]"))) - } - }) - - ssc.start() - ssc.awaitTermination() - } -} -// scalastyle:on println diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala deleted file mode 100644 index 6442b2a4e2..0000000000 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// scalastyle:off println -package org.apache.spark.examples.streaming - -import com.twitter.algebird.HyperLogLog._ -import com.twitter.algebird.HyperLogLogMonoid - -import org.apache.spark.SparkConf -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.streaming.twitter._ - -// scalastyle:off -/** - * Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute - * a windowed and global estimate of the unique user IDs occurring in a Twitter stream. - * <p> - * <p> - * This <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/"> - * blog post</a> and this - * <a href= "http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html"> - * blog post</a> - * have good overviews of HyperLogLog (HLL). HLL is a memory-efficient datastructure for - * estimating the cardinality of a data stream, i.e. the number of unique elements. - * <p><p> - * Algebird's implementation is a monoid, so we can succinctly merge two HLL instances in the - * reduce operation. - */ -// scalastyle:on -object TwitterAlgebirdHLL { - def main(args: Array[String]) { - - StreamingExamples.setStreamingLogLevels() - - /** Bit size parameter for HyperLogLog, trades off accuracy vs size */ - val BIT_SIZE = 12 - val filters = args - val sparkConf = new SparkConf().setAppName("TwitterAlgebirdHLL") - val ssc = new StreamingContext(sparkConf, Seconds(5)) - val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER) - - val users = stream.map(status => status.getUser.getId) - - val hll = new HyperLogLogMonoid(BIT_SIZE) - var globalHll = hll.zero - var userSet: Set[Long] = Set() - - val approxUsers = users.mapPartitions(ids => { - ids.map(id => hll.create(id)) - }).reduce(_ + _) - - val exactUsers = users.map(id => Set(id)).reduce(_ ++ _) - - approxUsers.foreachRDD(rdd => { - if (rdd.count() != 0) { - val partial = rdd.first() - globalHll += partial - println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt)) - println("Approx distinct users overall: %d".format(globalHll.estimatedSize.toInt)) - } - }) - - exactUsers.foreachRDD(rdd => { - if (rdd.count() != 0) { - val partial = rdd.first() - userSet ++= partial - println("Exact distinct users this batch: %d".format(partial.size)) - println("Exact distinct users overall: %d".format(userSet.size)) - println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1 - ) * 100)) - } - }) - - ssc.start() - ssc.awaitTermination() - } -} -// scalastyle:on println diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterHashTagJoinSentiments.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterHashTagJoinSentiments.scala deleted file mode 100644 index a8d392ca35..0000000000 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterHashTagJoinSentiments.scala +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// scalastyle:off println -package org.apache.spark.examples.streaming - -import org.apache.spark.SparkConf -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.streaming.twitter.TwitterUtils - -/** - * Displays the most positive hash tags by joining the streaming Twitter data with a static RDD of - * the AFINN word list (http://neuro.imm.dtu.dk/wiki/AFINN) - */ -object TwitterHashTagJoinSentiments { - def main(args: Array[String]) { - if (args.length < 4) { - System.err.println("Usage: TwitterHashTagJoinSentiments <consumer key> <consumer secret> " + - "<access token> <access token secret> [<filters>]") - System.exit(1) - } - - StreamingExamples.setStreamingLogLevels() - - val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4) - val filters = args.takeRight(args.length - 4) - - // Set the system properties so that Twitter4j library used by Twitter stream - // can use them to generate OAuth credentials - System.setProperty("twitter4j.oauth.consumerKey", consumerKey) - System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret) - System.setProperty("twitter4j.oauth.accessToken", accessToken) - System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret) - - val sparkConf = new SparkConf().setAppName("TwitterHashTagJoinSentiments") - val ssc = new StreamingContext(sparkConf, Seconds(2)) - val stream = TwitterUtils.createStream(ssc, None, filters) - - val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) - - // Read in the word-sentiment list and create a static RDD from it - val wordSentimentFilePath = "data/streaming/AFINN-111.txt" - val wordSentiments = ssc.sparkContext.textFile(wordSentimentFilePath).map { line => - val Array(word, happinessValue) = line.split("\t") - (word, happinessValue.toInt) - }.cache() - - // Determine the hash tags with the highest sentiment values by joining the streaming RDD - // with the static RDD inside the transform() method and then multiplying - // the frequency of the hash tag by its sentiment value - val happiest60 = hashTags.map(hashTag => (hashTag.tail, 1)) - .reduceByKeyAndWindow(_ + _, Seconds(60)) - .transform{topicCount => wordSentiments.join(topicCount)} - .map{case (topic, tuple) => (topic, tuple._1 * tuple._2)} - .map{case (topic, happinessValue) => (happinessValue, topic)} - .transform(_.sortByKey(false)) - - val happiest10 = hashTags.map(hashTag => (hashTag.tail, 1)) - .reduceByKeyAndWindow(_ + _, Seconds(10)) - .transform{topicCount => wordSentiments.join(topicCount)} - .map{case (topic, tuple) => (topic, tuple._1 * tuple._2)} - .map{case (topic, happinessValue) => (happinessValue, topic)} - .transform(_.sortByKey(false)) - - // Print hash tags with the most positive sentiment values - happiest60.foreachRDD(rdd => { - val topList = rdd.take(10) - println("\nHappiest topics in last 60 seconds (%s total):".format(rdd.count())) - topList.foreach{case (happiness, tag) => println("%s (%s happiness)".format(tag, happiness))} - }) - - happiest10.foreachRDD(rdd => { - val topList = rdd.take(10) - println("\nHappiest topics in last 10 seconds (%s total):".format(rdd.count())) - topList.foreach{case (happiness, tag) => println("%s (%s happiness)".format(tag, happiness))} - }) - - ssc.start() - ssc.awaitTermination() - } -} -// scalastyle:on println diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala deleted file mode 100644 index 5b69963cc8..0000000000 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// scalastyle:off println -package org.apache.spark.examples.streaming - -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.streaming.twitter._ -import org.apache.spark.SparkConf - -/** - * Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter - * stream. The stream is instantiated with credentials and optionally filters supplied by the - * command line arguments. - * - * Run this on your local machine as - * - */ -object TwitterPopularTags { - def main(args: Array[String]) { - if (args.length < 4) { - System.err.println("Usage: TwitterPopularTags <consumer key> <consumer secret> " + - "<access token> <access token secret> [<filters>]") - System.exit(1) - } - - StreamingExamples.setStreamingLogLevels() - - val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4) - val filters = args.takeRight(args.length - 4) - - // Set the system properties so that Twitter4j library used by twitter stream - // can use them to generate OAuth credentials - System.setProperty("twitter4j.oauth.consumerKey", consumerKey) - System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret) - System.setProperty("twitter4j.oauth.accessToken", accessToken) - System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret) - - val sparkConf = new SparkConf().setAppName("TwitterPopularTags") - val ssc = new StreamingContext(sparkConf, Seconds(2)) - val stream = TwitterUtils.createStream(ssc, None, filters) - - val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) - - val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)) - .map{case (topic, count) => (count, topic)} - .transform(_.sortByKey(false)) - - val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10)) - .map{case (topic, count) => (count, topic)} - .transform(_.sortByKey(false)) - - - // Print popular hashtags - topCounts60.foreachRDD(rdd => { - val topList = rdd.take(10) - println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) - topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} - }) - - topCounts10.foreachRDD(rdd => { - val topList = rdd.take(10) - println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count())) - topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} - }) - - ssc.start() - ssc.awaitTermination() - } -} -// scalastyle:on println diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala deleted file mode 100644 index 99b561750b..0000000000 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// scalastyle:off println -package org.apache.spark.examples.streaming - -import scala.language.implicitConversions - -import akka.actor.ActorSystem -import akka.actor.actorRef2Scala -import akka.util.ByteString -import akka.zeromq._ -import akka.zeromq.Subscribe - -import org.apache.spark.SparkConf -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.streaming.zeromq._ - -/** - * A simple publisher for demonstration purposes, repeatedly publishes random Messages - * every one second. - */ -object SimpleZeroMQPublisher { - - def main(args: Array[String]): Unit = { - if (args.length < 2) { - System.err.println("Usage: SimpleZeroMQPublisher <zeroMQUrl> <topic> ") - System.exit(1) - } - - val Seq(url, topic) = args.toSeq - val acs: ActorSystem = ActorSystem() - - val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, Bind(url)) - implicit def stringToByteString(x: String): ByteString = ByteString(x) - val messages: List[ByteString] = List("words ", "may ", "count ") - while (true) { - Thread.sleep(1000) - pubSocket ! ZMQMessage(ByteString(topic) :: messages) - } - acs.awaitTermination() - } -} - -// scalastyle:off -/** - * A sample wordcount with ZeroMQStream stream - * - * To work with zeroMQ, some native libraries have to be installed. - * Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide] - * (http://www.zeromq.org/intro:get-the-software) - * - * Usage: ZeroMQWordCount <zeroMQurl> <topic> - * <zeroMQurl> and <topic> describe where zeroMq publisher is running. - * - * To run this example locally, you may run publisher as - * `$ bin/run-example \ - * org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.0.1:1234 foo` - * and run the example as - * `$ bin/run-example \ - * org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.0.1:1234 foo` - */ -// scalastyle:on -object ZeroMQWordCount { - def main(args: Array[String]) { - if (args.length < 2) { - System.err.println("Usage: ZeroMQWordCount <zeroMQurl> <topic>") - System.exit(1) - } - StreamingExamples.setStreamingLogLevels() - val Seq(url, topic) = args.toSeq - val sparkConf = new SparkConf().setAppName("ZeroMQWordCount") - // Create the context and set the batch size - val ssc = new StreamingContext(sparkConf, Seconds(2)) - - def bytesToStringIterator(x: Seq[ByteString]): Iterator[String] = x.map(_.utf8String).iterator - - // For this stream, a zeroMQ publisher should be running. - val lines = ZeroMQUtils.createStream( - ssc, - url, - Subscribe(topic), - bytesToStringIterator _) - val words = lines.flatMap(_.split(" ")) - val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) - wordCounts.print() - ssc.start() - ssc.awaitTermination() - } -} -// scalastyle:on println |