aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-04-28 13:58:09 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-04-28 13:58:09 -0700
commit1d84964bf80f4e69e54d62286c3861c2362342d0 (patch)
tree416f1576d794f1e02162fd94c784f3e9c014fa60
parentf735884414a15c0c07df60068ee11f9da47eff77 (diff)
downloadspark-1d84964bf80f4e69e54d62286c3861c2362342d0.tar.gz
spark-1d84964bf80f4e69e54d62286c3861c2362342d0.tar.bz2
spark-1d84964bf80f4e69e54d62286c3861c2362342d0.zip
[SPARK-1633][Streaming] Java API unit test and example for custom streaming receiver in Java
Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #558 from tdas/more-fixes and squashes the following commits: c0c84e6 [Tathagata Das] Removing extra println() d8a8cf4 [Tathagata Das] More tweaks to make unit test work in Jenkins. b7caa98 [Tathagata Das] More tweaks. d337367 [Tathagata Das] More tweaks 22d6f2d [Tathagata Das] Merge remote-tracking branch 'apache/master' into more-fixes 40a961b [Tathagata Das] Modified java test to reduce flakiness. 9410ca6 [Tathagata Das] Merge remote-tracking branch 'apache/master' into more-fixes 86d9147 [Tathagata Das] scala style fix 2f3d7b1 [Tathagata Das] Added Scala custom receiver example. d677611 [Tathagata Das] Merge remote-tracking branch 'apache/master' into more-fixes bec3fc2 [Tathagata Das] Added license. 51d6514 [Tathagata Das] Fixed docs on receiver. 81aafa0 [Tathagata Das] Added Java test for Receiver API, and added JavaCustomReceiver example.
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java152
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java5
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala108
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala90
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java144
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala3
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala1
10 files changed, 476 insertions, 35 deletions
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java
new file mode 100644
index 0000000000..a94fa621dc
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.examples;
+
+import com.google.common.collect.Lists;
+
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.receiver.Receiver;
+import scala.Tuple2;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.ConnectException;
+import java.net.Socket;
+import java.util.regex.Pattern;
+
+/**
+ * Custom Receiver that receives data over a socket. Received bytes is interpreted as
+ * text and \n delimited lines are considered as records. They are then counted and printed.
+ *
+ * Usage: JavaCustomReceiver <master> <hostname> <port>
+ * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ * <hostname> and <port> of the TCP server that Spark Streaming would connect to receive data.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ * `$ nc -lk 9999`
+ * and then run the example
+ * `$ ./run org.apache.spark.streaming.examples.JavaCustomReceiver local[2] localhost 9999`
+ */
+
+public class JavaCustomReceiver extends Receiver<String> {
+ private static final Pattern SPACE = Pattern.compile(" ");
+
+ public static void main(String[] args) {
+ if (args.length < 3) {
+ System.err.println("Usage: JavaNetworkWordCount <master> <hostname> <port>\n" +
+ "In local mode, <master> should be 'local[n]' with n > 1");
+ System.exit(1);
+ }
+
+ StreamingExamples.setStreamingLogLevels();
+
+ // Create the context with a 1 second batch size
+ JavaStreamingContext ssc = new JavaStreamingContext(args[0], "JavaNetworkWordCount",
+ new Duration(1000), System.getenv("SPARK_HOME"),
+ JavaStreamingContext.jarOfClass(JavaNetworkWordCount.class));
+
+ // Create a input stream with the custom receiver on target ip:port and count the
+ // words in input stream of \n delimited text (eg. generated by 'nc')
+ JavaDStream<String> lines = ssc.receiverStream(
+ new JavaCustomReceiver(args[1], Integer.parseInt(args[2])));
+ JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+ @Override
+ public Iterable<String> call(String x) {
+ return Lists.newArrayList(SPACE.split(x));
+ }
+ });
+ JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
+ new PairFunction<String, String, Integer>() {
+ @Override public Tuple2<String, Integer> call(String s) {
+ return new Tuple2<String, Integer>(s, 1);
+ }
+ }).reduceByKey(new Function2<Integer, Integer, Integer>() {
+ @Override
+ public Integer call(Integer i1, Integer i2) {
+ return i1 + i2;
+ }
+ });
+
+ wordCounts.print();
+ ssc.start();
+ ssc.awaitTermination();
+ }
+
+ // ============= Receiver code that receives data over a socket ==============
+
+ String host = null;
+ int port = -1;
+
+ public JavaCustomReceiver(String host_ , int port_) {
+ super(StorageLevel.MEMORY_AND_DISK_2());
+ host = host_;
+ port = port_;
+ }
+
+ public void onStart() {
+ // Start the thread that receives data over a connection
+ new Thread() {
+ @Override public void run() {
+ receive();
+ }
+ }.start();
+ }
+
+ public void onStop() {
+ // There is nothing much to do as the thread calling receive()
+ // is designed to stop by itself isStopped() returns false
+ }
+
+ /** Create a socket connection and receive data until receiver is stopped */
+ private void receive() {
+ Socket socket = null;
+ String userInput = null;
+
+ try {
+ // connect to the server
+ socket = new Socket(host, port);
+
+ BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+
+ // Until stopped or connection broken continue reading
+ while (!isStopped() && (userInput = reader.readLine()) != null) {
+ System.out.println("Received data '" + userInput + "'");
+ store(userInput);
+ }
+ reader.close();
+ socket.close();
+
+ // Restart in an attempt to connect again when server is active again
+ restart("Trying to connect again");
+ } catch(ConnectException ce) {
+ // restart if could not connect to server
+ restart("Could not connect", ce);
+ } catch(Throwable t) {
+ restart("Error receiving data", t);
+ }
+ }
+}
+
+
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
index 7f68d451e9..0cc9d0ae1a 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
@@ -31,7 +31,7 @@ import java.util.regex.Pattern;
/**
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
- * Usage: NetworkWordCount <master> <hostname> <port>
+ * Usage: JavaNetworkWordCount <master> <hostname> <port>
* <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
*
@@ -43,9 +43,6 @@ import java.util.regex.Pattern;
public final class JavaNetworkWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
- private JavaNetworkWordCount() {
- }
-
public static void main(String[] args) {
if (args.length < 3) {
System.err.println("Usage: JavaNetworkWordCount <master> <hostname> <port>\n" +
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala
new file mode 100644
index 0000000000..eebffd8249
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.examples
+
+import java.io.{InputStreamReader, BufferedReader, InputStream}
+import java.net.Socket
+
+import org.apache.spark.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.receiver.Receiver
+
+/**
+ * Custom Receiver that receives data over a socket. Received bytes is interpreted as
+ * text and \n delimited lines are considered as records. They are then counted and printed.
+ *
+ * Usage: CustomReceiver <master> <hostname> <port>
+ * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ * <hostname> and <port> of the TCP server that Spark Streaming would connect to receive data.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ * `$ nc -lk 9999`
+ * and then run the example
+ * `$ ./run org.apache.spark.streaming.examples.CustomReceiver local[2] localhost 9999`
+ */
+object CustomReceiver {
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" +
+ "In local mode, <master> should be 'local[n]' with n > 1")
+ System.exit(1)
+ }
+
+ StreamingExamples.setStreamingLogLevels()
+
+ // Create the context with a 1 second batch size
+ val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1),
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
+
+ // Create a input stream with the custom receiver on target ip:port and count the
+ // words in input stream of \n delimited text (eg. generated by 'nc')
+ val lines = ssc.receiverStream(new CustomReceiver(args(1), args(2).toInt))
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+ wordCounts.print()
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}
+
+
+class CustomReceiver(host: String, port: Int)
+ extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
+
+ def onStart() {
+ // Start the thread that receives data over a connection
+ new Thread("Socket Receiver") {
+ override def run() { receive() }
+ }.start()
+ }
+
+ def onStop() {
+ // There is nothing much to do as the thread calling receive()
+ // is designed to stop by itself isStopped() returns false
+ }
+
+ /** Create a socket connection and receive data until receiver is stopped */
+ private def receive() {
+ var socket: Socket = null
+ var userInput: String = null
+ try {
+ logInfo("Connecting to " + host + ":" + port)
+ socket = new Socket(host, port)
+ logInfo("Connected to " + host + ":" + port)
+ val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
+ userInput = reader.readLine()
+ while(!isStopped && userInput != null) {
+ store(userInput)
+ userInput = reader.readLine()
+ }
+ reader.close()
+ socket.close()
+ logInfo("Stopped receiving")
+ restart("Trying to connect again")
+ } catch {
+ case e: java.net.ConnectException =>
+ restart("Error connecting to " + host + ":" + port, e)
+ case t: Throwable =>
+ restart("Error receiving data", t)
+ }
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index fbb2e9f85d..75a3e9334e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -390,7 +390,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
* @param receiver Custom implementation of Receiver
*/
- def receiverStream[T](receiver: Receiver[T]): ReceiverInputDStream[T] = {
+ def receiverStream[T](receiver: Receiver[T]): JavaReceiverInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.receiverStream(receiver)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
index 524c1b8d8c..b310c22b3a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
@@ -30,28 +30,55 @@ import org.apache.spark.annotation.DeveloperApi
* Abstract class of a receiver that can be run on worker nodes to receive external data. A
* custom receiver can be defined by defining the functions onStart() and onStop(). onStart()
* should define the setup steps necessary to start receiving data,
- * and onStop() should define the cleanup steps necessary to stop receiving data. A custom
- * receiver would look something like this.
+ * and onStop() should define the cleanup steps necessary to stop receiving data.
*
- * @example {{{
+ * A custom receiver in Scala would look like this.
+ *
+ * {{{
* class MyReceiver(storageLevel: StorageLevel) extends NetworkReceiver[String](storageLevel) {
- * def onStart() {
- * // Setup stuff (start threads, open sockets, etc.) to start receiving data.
- * // Must start new thread to receive data, as onStart() must be non-blocking.
+ * def onStart() {
+ * // Setup stuff (start threads, open sockets, etc.) to start receiving data.
+ * // Must start new thread to receive data, as onStart() must be non-blocking.
*
- * // Call store(...) in those threads to store received data into Spark's memory.
+ * // Call store(...) in those threads to store received data into Spark's memory.
*
- * // Call stop(...), restart() or reportError(...) on any thread based on how
- * // different errors should be handled.
+ * // Call stop(...), restart(...) or reportError(...) on any thread based on how
+ * // different errors needs to be handled.
*
- * // See corresponding method documentation for more details
- * }
+ * // See corresponding method documentation for more details
+ * }
*
- * def onStop() {
- * // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data.
- * }
+ * def onStop() {
+ * // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data.
+ * }
* }
* }}}
+ *
+ * A custom receiver in Java would look like this.
+ *
+ * {{{
+ * class MyReceiver extends Receiver<String> {
+ * public MyReceiver(StorageLevel storageLevel) {
+ * super(storageLevel);
+ * }
+ *
+ * public void onStart() {
+ * // Setup stuff (start threads, open sockets, etc.) to start receiving data.
+ * // Must start new thread to receive data, as onStart() must be non-blocking.
+ *
+ * // Call store(...) in those threads to store received data into Spark's memory.
+ *
+ * // Call stop(...), restart(...) or reportError(...) on any thread based on how
+ * // different errors needs to be handled.
+ *
+ * // See corresponding method documentation for more details
+ * }
+ *
+ * public void onStop() {
+ * // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data.
+ * }
+ * }
+ * }}}
*/
@DeveloperApi
abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable {
@@ -156,30 +183,34 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
}
/**
- * Restart the receiver. This will call `onStop()` immediately and return.
- * Asynchronously, after a delay, `onStart()` will be called.
+ * Restart the receiver. This method schedules the restart and returns
+ * immediately. The stopping and subsequent starting of the receiver
+ * (by calling `onStop()` and `onStart()`) is performed asynchronously
+ * in a background thread. The delay between the stopping and the starting
+ * is defined by the Spark configuration `spark.streaming.receiverRestartDelay`.
* The `message` will be reported to the driver.
- * The delay is defined by the Spark configuration
- * `spark.streaming.receiverRestartDelay`.
*/
def restart(message: String) {
executor.restartReceiver(message)
}
/**
- * Restart the receiver. This will call `onStop()` immediately and return.
- * Asynchronously, after a delay, `onStart()` will be called.
+ * Restart the receiver. This method schedules the restart and returns
+ * immediately. The stopping and subsequent starting of the receiver
+ * (by calling `onStop()` and `onStart()`) is performed asynchronously
+ * in a background thread. The delay between the stopping and the starting
+ * is defined by the Spark configuration `spark.streaming.receiverRestartDelay`.
* The `message` and `exception` will be reported to the driver.
- * The delay is defined by the Spark configuration
- * `spark.streaming.receiverRestartDelay`.
*/
def restart(message: String, error: Throwable) {
executor.restartReceiver(message, Some(error))
}
/**
- * Restart the receiver. This will call `onStop()` immediately and return.
- * Asynchronously, after the given delay, `onStart()` will be called.
+ * Restart the receiver. This method schedules the restart and returns
+ * immediately. The stopping and subsequent starting of the receiver
+ * (by calling `onStop()` and `onStart()`) is performed asynchronously
+ * in a background thread.
*/
def restart(message: String, error: Throwable, millisecond: Int) {
executor.restartReceiver(message, Some(error), millisecond)
@@ -195,16 +226,23 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
executor.stop(message, Some(error))
}
+ /** Check if the receiver has started or not. */
def isStarted(): Boolean = {
executor.isReceiverStarted()
}
- /** Check if receiver has been marked for stopping. */
+ /**
+ * Check if receiver has been marked for stopping. Use this to identify when
+ * the receiving of data should be stopped.
+ */
def isStopped(): Boolean = {
executor.isReceiverStopped()
}
- /** Get unique identifier of this receiver. */
+ /**
+ * Get the unique identifier the receiver input stream that this
+ * receiver is associated with.
+ */
def streamId = id
/*
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
new file mode 100644
index 0000000000..1b0787fe69
--- /dev/null
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.receiver.Receiver;
+import org.apache.spark.api.java.function.Function;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.net.ConnectException;
+import java.net.Socket;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class JavaReceiverAPISuite implements Serializable {
+
+ @Before
+ public void setUp() {
+ System.clearProperty("spark.streaming.clock");
+ }
+
+ @After
+ public void tearDown() {
+ System.clearProperty("spark.streaming.clock");
+ }
+
+ @Test
+ public void testReceiver() throws InterruptedException {
+ TestServer server = new TestServer(0);
+ server.start();
+
+ final AtomicLong dataCounter = new AtomicLong(0);
+
+ try {
+ JavaStreamingContext ssc = new JavaStreamingContext("local[2]", "test", new Duration(200));
+ JavaReceiverInputDStream<String> input =
+ ssc.receiverStream(new JavaSocketReceiver("localhost", server.port()));
+ JavaDStream<String> mapped = input.map(new Function<String, String>() {
+ @Override
+ public String call(String v1) throws Exception {
+ return v1 + ".";
+ }
+ });
+ mapped.foreachRDD(new Function<JavaRDD<String>, Void>() {
+ @Override
+ public Void call(JavaRDD<String> rdd) throws Exception {
+ long count = rdd.count();
+ dataCounter.addAndGet(count);
+ return null;
+ }
+ });
+
+ ssc.start();
+ long startTime = System.currentTimeMillis();
+ long timeout = 10000;
+
+ Thread.sleep(200);
+ for (int i = 0; i < 6; i++) {
+ server.send("" + i + "\n"); // \n to make sure these are separate lines
+ Thread.sleep(100);
+ }
+ while (dataCounter.get() == 0 && System.currentTimeMillis() - startTime < timeout) {
+ Thread.sleep(100);
+ }
+ ssc.stop();
+ assertTrue(dataCounter.get() > 0);
+ } finally {
+ server.stop();
+ }
+ }
+}
+
+class JavaSocketReceiver extends Receiver<String> {
+
+ String host = null;
+ int port = -1;
+
+ public JavaSocketReceiver(String host_ , int port_) {
+ super(StorageLevel.MEMORY_AND_DISK());
+ host = host_;
+ port = port_;
+ }
+
+ @Override
+ public void onStart() {
+ new Thread() {
+ @Override public void run() {
+ receive();
+ }
+ }.start();
+ }
+
+ @Override
+ public void onStop() {
+ }
+
+ private void receive() {
+ Socket socket = null;
+ try {
+ socket = new Socket(host, port);
+ BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+ String userInput;
+ while ((userInput = in.readLine()) != null) {
+ store(userInput);
+ }
+ in.close();
+ socket.close();
+ } catch(ConnectException ce) {
+ ce.printStackTrace();
+ restart("Could not connect", ce);
+ } catch(Throwable t) {
+ t.printStackTrace();
+ restart("Error receiving data", t);
+ }
+ }
+}
+
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
index 33f6df8f88..c0ea0491c3 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
@@ -26,6 +26,7 @@ import org.apache.spark.streaming._
import java.util.ArrayList
import collection.JavaConversions._
import org.apache.spark.api.java.JavaRDDLike
+import org.apache.spark.streaming.dstream.DStream
/** Exposes streaming test functionality in a Java-friendly way. */
trait JavaTestBase extends TestSuiteBase {
@@ -51,8 +52,7 @@ trait JavaTestBase extends TestSuiteBase {
* [[org.apache.spark.streaming.TestOutputStream]].
**/
def attachTestOutputStream[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]](
- dstream: JavaDStreamLike[T, This, R]) =
- {
+ dstream: JavaDStreamLike[T, This, R]) = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val ostream = new TestOutputStreamWithPartitions(dstream.dstream)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index b55b7834c9..3fa254065c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -49,7 +49,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
- val networkStream = ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
+ val networkStream = ssc.socketTextStream(
+ "localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
val outputStream = new TestOutputStream(networkStream, outputBuffer)
def output = outputBuffer.flatMap(x => x)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 3e2b25af84..ee0bc8b7d6 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -165,7 +165,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
ssc = new StreamingContext(sc, Milliseconds(100))
var runningCount = 0
TestReceiver.counter.set(1)
- val input = ssc.networkStream(new TestReceiver)
+ val input = ssc.receiverStream(new TestReceiver)
input.count.foreachRDD(rdd => {
val count = rdd.first()
runningCount += count.toInt
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 4f63fd3782..8036f77c97 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -155,6 +155,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
def afterFunction() {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
+ System.clearProperty("spark.streaming.clock")
}
before(beforeFunction)