aboutsummaryrefslogtreecommitdiff
path: root/examples/src
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-04-28 13:58:09 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-04-28 13:58:09 -0700
commit1d84964bf80f4e69e54d62286c3861c2362342d0 (patch)
tree416f1576d794f1e02162fd94c784f3e9c014fa60 /examples/src
parentf735884414a15c0c07df60068ee11f9da47eff77 (diff)
downloadspark-1d84964bf80f4e69e54d62286c3861c2362342d0.tar.gz
spark-1d84964bf80f4e69e54d62286c3861c2362342d0.tar.bz2
spark-1d84964bf80f4e69e54d62286c3861c2362342d0.zip
[SPARK-1633][Streaming] Java API unit test and example for custom streaming receiver in Java
Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #558 from tdas/more-fixes and squashes the following commits: c0c84e6 [Tathagata Das] Removing extra println() d8a8cf4 [Tathagata Das] More tweaks to make unit test work in Jenkins. b7caa98 [Tathagata Das] More tweaks. d337367 [Tathagata Das] More tweaks 22d6f2d [Tathagata Das] Merge remote-tracking branch 'apache/master' into more-fixes 40a961b [Tathagata Das] Modified java test to reduce flakiness. 9410ca6 [Tathagata Das] Merge remote-tracking branch 'apache/master' into more-fixes 86d9147 [Tathagata Das] scala style fix 2f3d7b1 [Tathagata Das] Added Scala custom receiver example. d677611 [Tathagata Das] Merge remote-tracking branch 'apache/master' into more-fixes bec3fc2 [Tathagata Das] Added license. 51d6514 [Tathagata Das] Fixed docs on receiver. 81aafa0 [Tathagata Das] Added Java test for Receiver API, and added JavaCustomReceiver example.
Diffstat (limited to 'examples/src')
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java152
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java5
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala108
3 files changed, 261 insertions, 4 deletions
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java
new file mode 100644
index 0000000000..a94fa621dc
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.examples;
+
+import com.google.common.collect.Lists;
+
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.receiver.Receiver;
+import scala.Tuple2;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.ConnectException;
+import java.net.Socket;
+import java.util.regex.Pattern;
+
+/**
+ * Custom Receiver that receives data over a socket. Received bytes is interpreted as
+ * text and \n delimited lines are considered as records. They are then counted and printed.
+ *
+ * Usage: JavaCustomReceiver <master> <hostname> <port>
+ * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ * <hostname> and <port> of the TCP server that Spark Streaming would connect to receive data.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ * `$ nc -lk 9999`
+ * and then run the example
+ * `$ ./run org.apache.spark.streaming.examples.JavaCustomReceiver local[2] localhost 9999`
+ */
+
+public class JavaCustomReceiver extends Receiver<String> {
+ private static final Pattern SPACE = Pattern.compile(" ");
+
+ public static void main(String[] args) {
+ if (args.length < 3) {
+ System.err.println("Usage: JavaNetworkWordCount <master> <hostname> <port>\n" +
+ "In local mode, <master> should be 'local[n]' with n > 1");
+ System.exit(1);
+ }
+
+ StreamingExamples.setStreamingLogLevels();
+
+ // Create the context with a 1 second batch size
+ JavaStreamingContext ssc = new JavaStreamingContext(args[0], "JavaNetworkWordCount",
+ new Duration(1000), System.getenv("SPARK_HOME"),
+ JavaStreamingContext.jarOfClass(JavaNetworkWordCount.class));
+
+ // Create a input stream with the custom receiver on target ip:port and count the
+ // words in input stream of \n delimited text (eg. generated by 'nc')
+ JavaDStream<String> lines = ssc.receiverStream(
+ new JavaCustomReceiver(args[1], Integer.parseInt(args[2])));
+ JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+ @Override
+ public Iterable<String> call(String x) {
+ return Lists.newArrayList(SPACE.split(x));
+ }
+ });
+ JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
+ new PairFunction<String, String, Integer>() {
+ @Override public Tuple2<String, Integer> call(String s) {
+ return new Tuple2<String, Integer>(s, 1);
+ }
+ }).reduceByKey(new Function2<Integer, Integer, Integer>() {
+ @Override
+ public Integer call(Integer i1, Integer i2) {
+ return i1 + i2;
+ }
+ });
+
+ wordCounts.print();
+ ssc.start();
+ ssc.awaitTermination();
+ }
+
+ // ============= Receiver code that receives data over a socket ==============
+
+ String host = null;
+ int port = -1;
+
+ public JavaCustomReceiver(String host_ , int port_) {
+ super(StorageLevel.MEMORY_AND_DISK_2());
+ host = host_;
+ port = port_;
+ }
+
+ public void onStart() {
+ // Start the thread that receives data over a connection
+ new Thread() {
+ @Override public void run() {
+ receive();
+ }
+ }.start();
+ }
+
+ public void onStop() {
+ // There is nothing much to do as the thread calling receive()
+ // is designed to stop by itself isStopped() returns false
+ }
+
+ /** Create a socket connection and receive data until receiver is stopped */
+ private void receive() {
+ Socket socket = null;
+ String userInput = null;
+
+ try {
+ // connect to the server
+ socket = new Socket(host, port);
+
+ BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+
+ // Until stopped or connection broken continue reading
+ while (!isStopped() && (userInput = reader.readLine()) != null) {
+ System.out.println("Received data '" + userInput + "'");
+ store(userInput);
+ }
+ reader.close();
+ socket.close();
+
+ // Restart in an attempt to connect again when server is active again
+ restart("Trying to connect again");
+ } catch(ConnectException ce) {
+ // restart if could not connect to server
+ restart("Could not connect", ce);
+ } catch(Throwable t) {
+ restart("Error receiving data", t);
+ }
+ }
+}
+
+
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
index 7f68d451e9..0cc9d0ae1a 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
@@ -31,7 +31,7 @@ import java.util.regex.Pattern;
/**
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
- * Usage: NetworkWordCount <master> <hostname> <port>
+ * Usage: JavaNetworkWordCount <master> <hostname> <port>
* <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
*
@@ -43,9 +43,6 @@ import java.util.regex.Pattern;
public final class JavaNetworkWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
- private JavaNetworkWordCount() {
- }
-
public static void main(String[] args) {
if (args.length < 3) {
System.err.println("Usage: JavaNetworkWordCount <master> <hostname> <port>\n" +
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala
new file mode 100644
index 0000000000..eebffd8249
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.examples
+
+import java.io.{InputStreamReader, BufferedReader, InputStream}
+import java.net.Socket
+
+import org.apache.spark.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.receiver.Receiver
+
+/**
+ * Custom Receiver that receives data over a socket. Received bytes is interpreted as
+ * text and \n delimited lines are considered as records. They are then counted and printed.
+ *
+ * Usage: CustomReceiver <master> <hostname> <port>
+ * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ * <hostname> and <port> of the TCP server that Spark Streaming would connect to receive data.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ * `$ nc -lk 9999`
+ * and then run the example
+ * `$ ./run org.apache.spark.streaming.examples.CustomReceiver local[2] localhost 9999`
+ */
+object CustomReceiver {
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" +
+ "In local mode, <master> should be 'local[n]' with n > 1")
+ System.exit(1)
+ }
+
+ StreamingExamples.setStreamingLogLevels()
+
+ // Create the context with a 1 second batch size
+ val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1),
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
+
+ // Create a input stream with the custom receiver on target ip:port and count the
+ // words in input stream of \n delimited text (eg. generated by 'nc')
+ val lines = ssc.receiverStream(new CustomReceiver(args(1), args(2).toInt))
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+ wordCounts.print()
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}
+
+
+class CustomReceiver(host: String, port: Int)
+ extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
+
+ def onStart() {
+ // Start the thread that receives data over a connection
+ new Thread("Socket Receiver") {
+ override def run() { receive() }
+ }.start()
+ }
+
+ def onStop() {
+ // There is nothing much to do as the thread calling receive()
+ // is designed to stop by itself isStopped() returns false
+ }
+
+ /** Create a socket connection and receive data until receiver is stopped */
+ private def receive() {
+ var socket: Socket = null
+ var userInput: String = null
+ try {
+ logInfo("Connecting to " + host + ":" + port)
+ socket = new Socket(host, port)
+ logInfo("Connected to " + host + ":" + port)
+ val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
+ userInput = reader.readLine()
+ while(!isStopped && userInput != null) {
+ store(userInput)
+ userInput = reader.readLine()
+ }
+ reader.close()
+ socket.close()
+ logInfo("Stopped receiving")
+ restart("Trying to connect again")
+ } catch {
+ case e: java.net.ConnectException =>
+ restart("Error connecting to " + host + ":" + port, e)
+ case t: Throwable =>
+ restart("Error receiving data", t)
+ }
+ }
+}