aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'examples/src/main/java')
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java144
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java75
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java175
3 files changed, 0 insertions, 394 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
deleted file mode 100644
index 7884b8cdff..0000000000
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.streaming;
-
-import java.util.Arrays;
-import java.util.Iterator;
-
-import scala.Tuple2;
-
-import akka.actor.ActorSelection;
-import akka.actor.Props;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.akka.AkkaUtils;
-import org.apache.spark.streaming.akka.JavaActorReceiver;
-
-/**
- * A sample actor as receiver, is also simplest. This receiver actor
- * goes and subscribe to a typical publisher/feeder actor and receives
- * data.
- *
- * @see [[org.apache.spark.examples.streaming.FeederActor]]
- */
-class JavaSampleActorReceiver<T> extends JavaActorReceiver {
-
- private final String urlOfPublisher;
-
- public JavaSampleActorReceiver(String urlOfPublisher) {
- this.urlOfPublisher = urlOfPublisher;
- }
-
- private ActorSelection remotePublisher;
-
- @Override
- public void preStart() {
- remotePublisher = getContext().actorSelection(urlOfPublisher);
- remotePublisher.tell(new SubscribeReceiver(getSelf()), getSelf());
- }
-
- @Override
- public void onReceive(Object msg) throws Exception {
- @SuppressWarnings("unchecked")
- T msgT = (T) msg;
- store(msgT);
- }
-
- @Override
- public void postStop() {
- remotePublisher.tell(new UnsubscribeReceiver(getSelf()), getSelf());
- }
-}
-
-/**
- * A sample word count program demonstrating the use of plugging in
- * Actor as Receiver
- * Usage: JavaActorWordCount <hostname> <port>
- * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on.
- *
- * To run this example locally, you may run Feeder Actor as
- * <code><pre>
- * $ bin/run-example org.apache.spark.examples.streaming.FeederActor localhost 9999
- * </pre></code>
- * and then run the example
- * <code><pre>
- * $ bin/run-example org.apache.spark.examples.streaming.JavaActorWordCount localhost 9999
- * </pre></code>
- */
-public class JavaActorWordCount {
-
- public static void main(String[] args) {
- if (args.length < 2) {
- System.err.println("Usage: JavaActorWordCount <hostname> <port>");
- System.exit(1);
- }
-
- StreamingExamples.setStreamingLogLevels();
-
- final String host = args[0];
- final String port = args[1];
- SparkConf sparkConf = new SparkConf().setAppName("JavaActorWordCount");
- // Create the context and set the batch size
- JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
-
- String feederActorURI = "akka.tcp://test@" + host + ":" + port + "/user/FeederActor";
-
- /*
- * Following is the use of AkkaUtils.createStream to plug in custom actor as receiver
- *
- * An important point to note:
- * Since Actor may exist outside the spark framework, It is thus user's responsibility
- * to ensure the type safety, i.e type of data received and InputDstream
- * should be same.
- *
- * For example: Both AkkaUtils.createStream and JavaSampleActorReceiver are parameterized
- * to same type to ensure type safety.
- */
- JavaDStream<String> lines = AkkaUtils.createStream(
- jssc,
- Props.create(JavaSampleActorReceiver.class, feederActorURI),
- "SampleReceiver");
-
- // compute wordcount
- lines.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> call(String s) {
- return Arrays.asList(s.split("\\s+")).iterator();
- }
- }).mapToPair(new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String s) {
- return new Tuple2<>(s, 1);
- }
- }).reduceByKey(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer i1, Integer i2) {
- return i1 + i2;
- }
- }).print();
-
- jssc.start();
- jssc.awaitTermination();
- }
-}
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
deleted file mode 100644
index da56637fe8..0000000000
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.streaming;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.examples.streaming.StreamingExamples;
-import org.apache.spark.streaming.*;
-import org.apache.spark.streaming.api.java.*;
-import org.apache.spark.streaming.flume.FlumeUtils;
-import org.apache.spark.streaming.flume.SparkFlumeEvent;
-
-/**
- * Produces a count of events received from Flume.
- *
- * This should be used in conjunction with an AvroSink in Flume. It will start
- * an Avro server on at the request host:port address and listen for requests.
- * Your Flume AvroSink should be pointed to this address.
- *
- * Usage: JavaFlumeEventCount <host> <port>
- * <host> is the host the Flume receiver will be started on - a receiver
- * creates a server and listens for flume events.
- * <port> is the port the Flume receiver will listen on.
- *
- * To run this example:
- * `$ bin/run-example org.apache.spark.examples.streaming.JavaFlumeEventCount <host> <port>`
- */
-public final class JavaFlumeEventCount {
- private JavaFlumeEventCount() {
- }
-
- public static void main(String[] args) {
- if (args.length != 2) {
- System.err.println("Usage: JavaFlumeEventCount <host> <port>");
- System.exit(1);
- }
-
- StreamingExamples.setStreamingLogLevels();
-
- String host = args[0];
- int port = Integer.parseInt(args[1]);
-
- Duration batchInterval = new Duration(2000);
- SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
- JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval);
- JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port);
-
- flumeStream.count();
-
- flumeStream.count().map(new Function<Long, String>() {
- @Override
- public String call(Long in) {
- return "Received " + in + " flume events.";
- }
- }).print();
-
- ssc.start();
- ssc.awaitTermination();
- }
-}
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
deleted file mode 100644
index f0ae9a99ba..0000000000
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.streaming;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.api.java.function.VoidFunction;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.twitter.TwitterUtils;
-import scala.Tuple2;
-import twitter4j.Status;
-
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * Displays the most positive hash tags by joining the streaming Twitter data with a static RDD of
- * the AFINN word list (http://neuro.imm.dtu.dk/wiki/AFINN)
- */
-public class JavaTwitterHashTagJoinSentiments {
-
- public static void main(String[] args) {
- if (args.length < 4) {
- System.err.println("Usage: JavaTwitterHashTagJoinSentiments <consumer key> <consumer secret>" +
- " <access token> <access token secret> [<filters>]");
- System.exit(1);
- }
-
- StreamingExamples.setStreamingLogLevels();
-
- String consumerKey = args[0];
- String consumerSecret = args[1];
- String accessToken = args[2];
- String accessTokenSecret = args[3];
- String[] filters = Arrays.copyOfRange(args, 4, args.length);
-
- // Set the system properties so that Twitter4j library used by Twitter stream
- // can use them to generate OAuth credentials
- System.setProperty("twitter4j.oauth.consumerKey", consumerKey);
- System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret);
- System.setProperty("twitter4j.oauth.accessToken", accessToken);
- System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret);
-
- SparkConf sparkConf = new SparkConf().setAppName("JavaTwitterHashTagJoinSentiments");
- JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
- JavaReceiverInputDStream<Status> stream = TwitterUtils.createStream(jssc, filters);
-
- JavaDStream<String> words = stream.flatMap(new FlatMapFunction<Status, String>() {
- @Override
- public Iterator<String> call(Status s) {
- return Arrays.asList(s.getText().split(" ")).iterator();
- }
- });
-
- JavaDStream<String> hashTags = words.filter(new Function<String, Boolean>() {
- @Override
- public Boolean call(String word) {
- return word.startsWith("#");
- }
- });
-
- // Read in the word-sentiment list and create a static RDD from it
- String wordSentimentFilePath = "data/streaming/AFINN-111.txt";
- final JavaPairRDD<String, Double> wordSentiments = jssc.sparkContext().textFile(wordSentimentFilePath)
- .mapToPair(new PairFunction<String, String, Double>(){
- @Override
- public Tuple2<String, Double> call(String line) {
- String[] columns = line.split("\t");
- return new Tuple2<>(columns[0], Double.parseDouble(columns[1]));
- }
- });
-
- JavaPairDStream<String, Integer> hashTagCount = hashTags.mapToPair(
- new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String s) {
- // leave out the # character
- return new Tuple2<>(s.substring(1), 1);
- }
- });
-
- JavaPairDStream<String, Integer> hashTagTotals = hashTagCount.reduceByKeyAndWindow(
- new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer a, Integer b) {
- return a + b;
- }
- }, new Duration(10000));
-
- // Determine the hash tags with the highest sentiment values by joining the streaming RDD
- // with the static RDD inside the transform() method and then multiplying
- // the frequency of the hash tag by its sentiment value
- JavaPairDStream<String, Tuple2<Double, Integer>> joinedTuples =
- hashTagTotals.transformToPair(new Function<JavaPairRDD<String, Integer>,
- JavaPairRDD<String, Tuple2<Double, Integer>>>() {
- @Override
- public JavaPairRDD<String, Tuple2<Double, Integer>> call(
- JavaPairRDD<String, Integer> topicCount) {
- return wordSentiments.join(topicCount);
- }
- });
-
- JavaPairDStream<String, Double> topicHappiness = joinedTuples.mapToPair(
- new PairFunction<Tuple2<String, Tuple2<Double, Integer>>, String, Double>() {
- @Override
- public Tuple2<String, Double> call(Tuple2<String,
- Tuple2<Double, Integer>> topicAndTuplePair) {
- Tuple2<Double, Integer> happinessAndCount = topicAndTuplePair._2();
- return new Tuple2<>(topicAndTuplePair._1(),
- happinessAndCount._1() * happinessAndCount._2());
- }
- });
-
- JavaPairDStream<Double, String> happinessTopicPairs = topicHappiness.mapToPair(
- new PairFunction<Tuple2<String, Double>, Double, String>() {
- @Override
- public Tuple2<Double, String> call(Tuple2<String, Double> topicHappiness) {
- return new Tuple2<>(topicHappiness._2(),
- topicHappiness._1());
- }
- });
-
- JavaPairDStream<Double, String> happiest10 = happinessTopicPairs.transformToPair(
- new Function<JavaPairRDD<Double, String>, JavaPairRDD<Double, String>>() {
- @Override
- public JavaPairRDD<Double, String> call(
- JavaPairRDD<Double, String> happinessAndTopics) {
- return happinessAndTopics.sortByKey(false);
- }
- }
- );
-
- // Print hash tags with the most positive sentiment values
- happiest10.foreachRDD(new VoidFunction<JavaPairRDD<Double, String>>() {
- @Override
- public void call(JavaPairRDD<Double, String> happinessTopicPairs) {
- List<Tuple2<Double, String>> topList = happinessTopicPairs.take(10);
- System.out.println(
- String.format("\nHappiest topics in last 10 seconds (%s total):",
- happinessTopicPairs.count()));
- for (Tuple2<Double, String> pair : topList) {
- System.out.println(
- String.format("%s (%s happiness)", pair._2(), pair._1()));
- }
- }
- });
-
- jssc.start();
- jssc.awaitTermination();
- }
-}