diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-04-28 13:58:09 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-04-28 13:58:32 -0700 |
commit | 6d89faffd2ea8b1c50de81d4ece28b80d7e2be2e (patch) | |
tree | 215bfa108d7b36abc1af9b8f56fe6a6a8e559356 /examples/src/main/scala | |
parent | 42cb3b41b97a0eeea80f205b1c5c7677422f3c94 (diff) | |
download | spark-6d89faffd2ea8b1c50de81d4ece28b80d7e2be2e.tar.gz spark-6d89faffd2ea8b1c50de81d4ece28b80d7e2be2e.tar.bz2 spark-6d89faffd2ea8b1c50de81d4ece28b80d7e2be2e.zip |
[SPARK-1633][Streaming] Java API unit test and example for custom streaming receiver in Java
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #558 from tdas/more-fixes and squashes the following commits:
c0c84e6 [Tathagata Das] Removing extra println()
d8a8cf4 [Tathagata Das] More tweaks to make unit test work in Jenkins.
b7caa98 [Tathagata Das] More tweaks.
d337367 [Tathagata Das] More tweaks
22d6f2d [Tathagata Das] Merge remote-tracking branch 'apache/master' into more-fixes
40a961b [Tathagata Das] Modified java test to reduce flakiness.
9410ca6 [Tathagata Das] Merge remote-tracking branch 'apache/master' into more-fixes
86d9147 [Tathagata Das] scala style fix
2f3d7b1 [Tathagata Das] Added Scala custom receiver example.
d677611 [Tathagata Das] Merge remote-tracking branch 'apache/master' into more-fixes
bec3fc2 [Tathagata Das] Added license.
51d6514 [Tathagata Das] Fixed docs on receiver.
81aafa0 [Tathagata Das] Added Java test for Receiver API, and added JavaCustomReceiver example.
(cherry picked from commit 1d84964bf80f4e69e54d62286c3861c2362342d0)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
Diffstat (limited to 'examples/src/main/scala')
-rw-r--r-- | examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala | 108 |
1 files changed, 108 insertions, 0 deletions
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala new file mode 100644 index 0000000000..eebffd8249 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.examples + +import java.io.{InputStreamReader, BufferedReader, InputStream} +import java.net.Socket + +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.receiver.Receiver + +/** + * Custom Receiver that receives data over a socket. Received bytes is interpreted as + * text and \n delimited lines are considered as records. They are then counted and printed. + * + * Usage: CustomReceiver <master> <hostname> <port> + * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1. + * <hostname> and <port> of the TCP server that Spark Streaming would connect to receive data. + * + * To run this on your local machine, you need to first run a Netcat server + * `$ nc -lk 9999` + * and then run the example + * `$ ./run org.apache.spark.streaming.examples.CustomReceiver local[2] localhost 9999` + */ +object CustomReceiver { + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" + + "In local mode, <master> should be 'local[n]' with n > 1") + System.exit(1) + } + + StreamingExamples.setStreamingLogLevels() + + // Create the context with a 1 second batch size + val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1), + System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + + // Create a input stream with the custom receiver on target ip:port and count the + // words in input stream of \n delimited text (eg. generated by 'nc') + val lines = ssc.receiverStream(new CustomReceiver(args(1), args(2).toInt)) + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + wordCounts.print() + ssc.start() + ssc.awaitTermination() + } +} + + +class CustomReceiver(host: String, port: Int) + extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging { + + def onStart() { + // Start the thread that receives data over a connection + new Thread("Socket Receiver") { + override def run() { receive() } + }.start() + } + + def onStop() { + // There is nothing much to do as the thread calling receive() + // is designed to stop by itself isStopped() returns false + } + + /** Create a socket connection and receive data until receiver is stopped */ + private def receive() { + var socket: Socket = null + var userInput: String = null + try { + logInfo("Connecting to " + host + ":" + port) + socket = new Socket(host, port) + logInfo("Connected to " + host + ":" + port) + val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8")) + userInput = reader.readLine() + while(!isStopped && userInput != null) { + store(userInput) + userInput = reader.readLine() + } + reader.close() + socket.close() + logInfo("Stopped receiving") + restart("Trying to connect again") + } catch { + case e: java.net.ConnectException => + restart("Error connecting to " + host + ":" + port, e) + case t: Throwable => + restart("Error receiving data", t) + } + } +} |