aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/java
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-03-14 16:56:04 -0700
committerReynold Xin <rxin@databricks.com>2016-03-14 16:56:04 -0700
commit06dec37455c3f800897defee6fad0da623f26050 (patch)
treed49dd098587f8a3c7a019b0aad605327da6fcecd /examples/src/main/java
parent8301fadd8d269da11e72870b7a889596e3337839 (diff)
downloadspark-06dec37455c3f800897defee6fad0da623f26050.tar.gz
spark-06dec37455c3f800897defee6fad0da623f26050.tar.bz2
spark-06dec37455c3f800897defee6fad0da623f26050.zip
[SPARK-13843][STREAMING] Remove streaming-flume, streaming-mqtt, streaming-zeromq, streaming-akka, streaming-twitter to Spark packages
## What changes were proposed in this pull request? Currently there are a few sub-projects, each for integrating with different external sources for Streaming. Now that we have better ability to include external libraries (spark packages) and with Spark 2.0 coming up, we can move the following projects out of Spark to https://github.com/spark-packages - streaming-flume - streaming-akka - streaming-mqtt - streaming-zeromq - streaming-twitter They are just some ancillary packages and considering the overhead of maintenance, running tests and PR failures, it's better to maintain them out of Spark. In addition, these projects can have their different release cycles and we can release them faster. I have already copied these projects to https://github.com/spark-packages ## How was this patch tested? Jenkins tests Author: Shixiong Zhu <shixiong@databricks.com> Closes #11672 from zsxwing/remove-external-pkg.
Diffstat (limited to 'examples/src/main/java')
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java144
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java75
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java175
3 files changed, 0 insertions, 394 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
deleted file mode 100644
index 7884b8cdff..0000000000
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.streaming;
-
-import java.util.Arrays;
-import java.util.Iterator;
-
-import scala.Tuple2;
-
-import akka.actor.ActorSelection;
-import akka.actor.Props;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.akka.AkkaUtils;
-import org.apache.spark.streaming.akka.JavaActorReceiver;
-
-/**
- * A sample actor as receiver, is also simplest. This receiver actor
- * goes and subscribe to a typical publisher/feeder actor and receives
- * data.
- *
- * @see [[org.apache.spark.examples.streaming.FeederActor]]
- */
-class JavaSampleActorReceiver<T> extends JavaActorReceiver {
-
- private final String urlOfPublisher;
-
- public JavaSampleActorReceiver(String urlOfPublisher) {
- this.urlOfPublisher = urlOfPublisher;
- }
-
- private ActorSelection remotePublisher;
-
- @Override
- public void preStart() {
- remotePublisher = getContext().actorSelection(urlOfPublisher);
- remotePublisher.tell(new SubscribeReceiver(getSelf()), getSelf());
- }
-
- @Override
- public void onReceive(Object msg) throws Exception {
- @SuppressWarnings("unchecked")
- T msgT = (T) msg;
- store(msgT);
- }
-
- @Override
- public void postStop() {
- remotePublisher.tell(new UnsubscribeReceiver(getSelf()), getSelf());
- }
-}
-
-/**
- * A sample word count program demonstrating the use of plugging in
- * Actor as Receiver
- * Usage: JavaActorWordCount <hostname> <port>
- * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on.
- *
- * To run this example locally, you may run Feeder Actor as
- * <code><pre>
- * $ bin/run-example org.apache.spark.examples.streaming.FeederActor localhost 9999
- * </pre></code>
- * and then run the example
- * <code><pre>
- * $ bin/run-example org.apache.spark.examples.streaming.JavaActorWordCount localhost 9999
- * </pre></code>
- */
-public class JavaActorWordCount {
-
- public static void main(String[] args) {
- if (args.length < 2) {
- System.err.println("Usage: JavaActorWordCount <hostname> <port>");
- System.exit(1);
- }
-
- StreamingExamples.setStreamingLogLevels();
-
- final String host = args[0];
- final String port = args[1];
- SparkConf sparkConf = new SparkConf().setAppName("JavaActorWordCount");
- // Create the context and set the batch size
- JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
-
- String feederActorURI = "akka.tcp://test@" + host + ":" + port + "/user/FeederActor";
-
- /*
- * Following is the use of AkkaUtils.createStream to plug in custom actor as receiver
- *
- * An important point to note:
- * Since Actor may exist outside the spark framework, It is thus user's responsibility
- * to ensure the type safety, i.e type of data received and InputDstream
- * should be same.
- *
- * For example: Both AkkaUtils.createStream and JavaSampleActorReceiver are parameterized
- * to same type to ensure type safety.
- */
- JavaDStream<String> lines = AkkaUtils.createStream(
- jssc,
- Props.create(JavaSampleActorReceiver.class, feederActorURI),
- "SampleReceiver");
-
- // compute wordcount
- lines.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> call(String s) {
- return Arrays.asList(s.split("\\s+")).iterator();
- }
- }).mapToPair(new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String s) {
- return new Tuple2<>(s, 1);
- }
- }).reduceByKey(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer i1, Integer i2) {
- return i1 + i2;
- }
- }).print();
-
- jssc.start();
- jssc.awaitTermination();
- }
-}
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
deleted file mode 100644
index da56637fe8..0000000000
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.streaming;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.examples.streaming.StreamingExamples;
-import org.apache.spark.streaming.*;
-import org.apache.spark.streaming.api.java.*;
-import org.apache.spark.streaming.flume.FlumeUtils;
-import org.apache.spark.streaming.flume.SparkFlumeEvent;
-
-/**
- * Produces a count of events received from Flume.
- *
- * This should be used in conjunction with an AvroSink in Flume. It will start
- * an Avro server on at the request host:port address and listen for requests.
- * Your Flume AvroSink should be pointed to this address.
- *
- * Usage: JavaFlumeEventCount <host> <port>
- * <host> is the host the Flume receiver will be started on - a receiver
- * creates a server and listens for flume events.
- * <port> is the port the Flume receiver will listen on.
- *
- * To run this example:
- * `$ bin/run-example org.apache.spark.examples.streaming.JavaFlumeEventCount <host> <port>`
- */
-public final class JavaFlumeEventCount {
- private JavaFlumeEventCount() {
- }
-
- public static void main(String[] args) {
- if (args.length != 2) {
- System.err.println("Usage: JavaFlumeEventCount <host> <port>");
- System.exit(1);
- }
-
- StreamingExamples.setStreamingLogLevels();
-
- String host = args[0];
- int port = Integer.parseInt(args[1]);
-
- Duration batchInterval = new Duration(2000);
- SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
- JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval);
- JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port);
-
- flumeStream.count();
-
- flumeStream.count().map(new Function<Long, String>() {
- @Override
- public String call(Long in) {
- return "Received " + in + " flume events.";
- }
- }).print();
-
- ssc.start();
- ssc.awaitTermination();
- }
-}
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
deleted file mode 100644
index f0ae9a99ba..0000000000
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.streaming;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.api.java.function.VoidFunction;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.twitter.TwitterUtils;
-import scala.Tuple2;
-import twitter4j.Status;
-
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * Displays the most positive hash tags by joining the streaming Twitter data with a static RDD of
- * the AFINN word list (http://neuro.imm.dtu.dk/wiki/AFINN)
- */
-public class JavaTwitterHashTagJoinSentiments {
-
- public static void main(String[] args) {
- if (args.length < 4) {
- System.err.println("Usage: JavaTwitterHashTagJoinSentiments <consumer key> <consumer secret>" +
- " <access token> <access token secret> [<filters>]");
- System.exit(1);
- }
-
- StreamingExamples.setStreamingLogLevels();
-
- String consumerKey = args[0];
- String consumerSecret = args[1];
- String accessToken = args[2];
- String accessTokenSecret = args[3];
- String[] filters = Arrays.copyOfRange(args, 4, args.length);
-
- // Set the system properties so that Twitter4j library used by Twitter stream
- // can use them to generate OAuth credentials
- System.setProperty("twitter4j.oauth.consumerKey", consumerKey);
- System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret);
- System.setProperty("twitter4j.oauth.accessToken", accessToken);
- System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret);
-
- SparkConf sparkConf = new SparkConf().setAppName("JavaTwitterHashTagJoinSentiments");
- JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
- JavaReceiverInputDStream<Status> stream = TwitterUtils.createStream(jssc, filters);
-
- JavaDStream<String> words = stream.flatMap(new FlatMapFunction<Status, String>() {
- @Override
- public Iterator<String> call(Status s) {
- return Arrays.asList(s.getText().split(" ")).iterator();
- }
- });
-
- JavaDStream<String> hashTags = words.filter(new Function<String, Boolean>() {
- @Override
- public Boolean call(String word) {
- return word.startsWith("#");
- }
- });
-
- // Read in the word-sentiment list and create a static RDD from it
- String wordSentimentFilePath = "data/streaming/AFINN-111.txt";
- final JavaPairRDD<String, Double> wordSentiments = jssc.sparkContext().textFile(wordSentimentFilePath)
- .mapToPair(new PairFunction<String, String, Double>(){
- @Override
- public Tuple2<String, Double> call(String line) {
- String[] columns = line.split("\t");
- return new Tuple2<>(columns[0], Double.parseDouble(columns[1]));
- }
- });
-
- JavaPairDStream<String, Integer> hashTagCount = hashTags.mapToPair(
- new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String s) {
- // leave out the # character
- return new Tuple2<>(s.substring(1), 1);
- }
- });
-
- JavaPairDStream<String, Integer> hashTagTotals = hashTagCount.reduceByKeyAndWindow(
- new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer a, Integer b) {
- return a + b;
- }
- }, new Duration(10000));
-
- // Determine the hash tags with the highest sentiment values by joining the streaming RDD
- // with the static RDD inside the transform() method and then multiplying
- // the frequency of the hash tag by its sentiment value
- JavaPairDStream<String, Tuple2<Double, Integer>> joinedTuples =
- hashTagTotals.transformToPair(new Function<JavaPairRDD<String, Integer>,
- JavaPairRDD<String, Tuple2<Double, Integer>>>() {
- @Override
- public JavaPairRDD<String, Tuple2<Double, Integer>> call(
- JavaPairRDD<String, Integer> topicCount) {
- return wordSentiments.join(topicCount);
- }
- });
-
- JavaPairDStream<String, Double> topicHappiness = joinedTuples.mapToPair(
- new PairFunction<Tuple2<String, Tuple2<Double, Integer>>, String, Double>() {
- @Override
- public Tuple2<String, Double> call(Tuple2<String,
- Tuple2<Double, Integer>> topicAndTuplePair) {
- Tuple2<Double, Integer> happinessAndCount = topicAndTuplePair._2();
- return new Tuple2<>(topicAndTuplePair._1(),
- happinessAndCount._1() * happinessAndCount._2());
- }
- });
-
- JavaPairDStream<Double, String> happinessTopicPairs = topicHappiness.mapToPair(
- new PairFunction<Tuple2<String, Double>, Double, String>() {
- @Override
- public Tuple2<Double, String> call(Tuple2<String, Double> topicHappiness) {
- return new Tuple2<>(topicHappiness._2(),
- topicHappiness._1());
- }
- });
-
- JavaPairDStream<Double, String> happiest10 = happinessTopicPairs.transformToPair(
- new Function<JavaPairRDD<Double, String>, JavaPairRDD<Double, String>>() {
- @Override
- public JavaPairRDD<Double, String> call(
- JavaPairRDD<Double, String> happinessAndTopics) {
- return happinessAndTopics.sortByKey(false);
- }
- }
- );
-
- // Print hash tags with the most positive sentiment values
- happiest10.foreachRDD(new VoidFunction<JavaPairRDD<Double, String>>() {
- @Override
- public void call(JavaPairRDD<Double, String> happinessTopicPairs) {
- List<Tuple2<Double, String>> topList = happinessTopicPairs.take(10);
- System.out.println(
- String.format("\nHappiest topics in last 10 seconds (%s total):",
- happinessTopicPairs.count()));
- for (Tuple2<Double, String> pair : topList) {
- System.out.println(
- String.format("%s (%s happiness)", pair._2(), pair._1()));
- }
- }
- });
-
- jssc.start();
- jssc.awaitTermination();
- }
-}