From fdae095de2daa1fc3b343c05e515235756d856a4 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Tue, 6 May 2014 21:55:05 -0700 Subject: [HOTFIX] SPARK-1637: There are some Streaming examples added after the PR #571 was last updated. This resulted in Compilation Errors. cc @mateiz project not compiling currently. Author: Sandeep Closes #673 from techaddict/SPARK-1637-HOTFIX and squashes the following commits: b512f4f [Sandeep] [SPARK-1637][HOTFIX] There are some Streaming examples added after the PR #571 was last updated. This resulted in Compilation Errors. --- .../spark/examples/streaming/CustomReceiver.scala | 108 +++++++++++++++++++++ .../spark/streaming/examples/CustomReceiver.scala | 108 --------------------- 2 files changed, 108 insertions(+), 108 deletions(-) create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala (limited to 'examples/src/main/scala') diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala new file mode 100644 index 0000000000..e317e2d36a --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import java.io.{InputStreamReader, BufferedReader, InputStream} +import java.net.Socket + +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.receiver.Receiver + +/** + * Custom Receiver that receives data over a socket. Received bytes is interpreted as + * text and \n delimited lines are considered as records. They are then counted and printed. + * + * Usage: CustomReceiver + * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. + * and of the TCP server that Spark Streaming would connect to receive data. + * + * To run this on your local machine, you need to first run a Netcat server + * `$ nc -lk 9999` + * and then run the example + * `$ ./run org.apache.spark.examples.streaming.CustomReceiver local[2] localhost 9999` + */ +object CustomReceiver { + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: NetworkWordCount \n" + + "In local mode, should be 'local[n]' with n > 1") + System.exit(1) + } + + StreamingExamples.setStreamingLogLevels() + + // Create the context with a 1 second batch size + val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1), + System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + + // Create a input stream with the custom receiver on target ip:port and count the + // words in input stream of \n delimited text (eg. generated by 'nc') + val lines = ssc.receiverStream(new CustomReceiver(args(1), args(2).toInt)) + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + wordCounts.print() + ssc.start() + ssc.awaitTermination() + } +} + + +class CustomReceiver(host: String, port: Int) + extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging { + + def onStart() { + // Start the thread that receives data over a connection + new Thread("Socket Receiver") { + override def run() { receive() } + }.start() + } + + def onStop() { + // There is nothing much to do as the thread calling receive() + // is designed to stop by itself isStopped() returns false + } + + /** Create a socket connection and receive data until receiver is stopped */ + private def receive() { + var socket: Socket = null + var userInput: String = null + try { + logInfo("Connecting to " + host + ":" + port) + socket = new Socket(host, port) + logInfo("Connected to " + host + ":" + port) + val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8")) + userInput = reader.readLine() + while(!isStopped && userInput != null) { + store(userInput) + userInput = reader.readLine() + } + reader.close() + socket.close() + logInfo("Stopped receiving") + restart("Trying to connect again") + } catch { + case e: java.net.ConnectException => + restart("Error connecting to " + host + ":" + port, e) + case t: Throwable => + restart("Error receiving data", t) + } + } +} diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala deleted file mode 100644 index eebffd8249..0000000000 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.examples - -import java.io.{InputStreamReader, BufferedReader, InputStream} -import java.net.Socket - -import org.apache.spark.Logging -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.streaming.receiver.Receiver - -/** - * Custom Receiver that receives data over a socket. Received bytes is interpreted as - * text and \n delimited lines are considered as records. They are then counted and printed. - * - * Usage: CustomReceiver - * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. - * and of the TCP server that Spark Streaming would connect to receive data. - * - * To run this on your local machine, you need to first run a Netcat server - * `$ nc -lk 9999` - * and then run the example - * `$ ./run org.apache.spark.streaming.examples.CustomReceiver local[2] localhost 9999` - */ -object CustomReceiver { - def main(args: Array[String]) { - if (args.length < 3) { - System.err.println("Usage: NetworkWordCount \n" + - "In local mode, should be 'local[n]' with n > 1") - System.exit(1) - } - - StreamingExamples.setStreamingLogLevels() - - // Create the context with a 1 second batch size - val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) - - // Create a input stream with the custom receiver on target ip:port and count the - // words in input stream of \n delimited text (eg. generated by 'nc') - val lines = ssc.receiverStream(new CustomReceiver(args(1), args(2).toInt)) - val words = lines.flatMap(_.split(" ")) - val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) - wordCounts.print() - ssc.start() - ssc.awaitTermination() - } -} - - -class CustomReceiver(host: String, port: Int) - extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging { - - def onStart() { - // Start the thread that receives data over a connection - new Thread("Socket Receiver") { - override def run() { receive() } - }.start() - } - - def onStop() { - // There is nothing much to do as the thread calling receive() - // is designed to stop by itself isStopped() returns false - } - - /** Create a socket connection and receive data until receiver is stopped */ - private def receive() { - var socket: Socket = null - var userInput: String = null - try { - logInfo("Connecting to " + host + ":" + port) - socket = new Socket(host, port) - logInfo("Connected to " + host + ":" + port) - val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8")) - userInput = reader.readLine() - while(!isStopped && userInput != null) { - store(userInput) - userInput = reader.readLine() - } - reader.close() - socket.close() - logInfo("Stopped receiving") - restart("Trying to connect again") - } catch { - case e: java.net.ConnectException => - restart("Error connecting to " + host + ":" + port, e) - case t: Throwable => - restart("Error receiving data", t) - } - } -} -- cgit v1.2.3