aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-04-28 13:58:09 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-04-28 13:58:09 -0700
commit1d84964bf80f4e69e54d62286c3861c2362342d0 (patch)
tree416f1576d794f1e02162fd94c784f3e9c014fa60 /streaming
parentf735884414a15c0c07df60068ee11f9da47eff77 (diff)
downloadspark-1d84964bf80f4e69e54d62286c3861c2362342d0.tar.gz
spark-1d84964bf80f4e69e54d62286c3861c2362342d0.tar.bz2
spark-1d84964bf80f4e69e54d62286c3861c2362342d0.zip
[SPARK-1633][Streaming] Java API unit test and example for custom streaming receiver in Java
Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #558 from tdas/more-fixes and squashes the following commits: c0c84e6 [Tathagata Das] Removing extra println() d8a8cf4 [Tathagata Das] More tweaks to make unit test work in Jenkins. b7caa98 [Tathagata Das] More tweaks. d337367 [Tathagata Das] More tweaks 22d6f2d [Tathagata Das] Merge remote-tracking branch 'apache/master' into more-fixes 40a961b [Tathagata Das] Modified java test to reduce flakiness. 9410ca6 [Tathagata Das] Merge remote-tracking branch 'apache/master' into more-fixes 86d9147 [Tathagata Das] scala style fix 2f3d7b1 [Tathagata Das] Added Scala custom receiver example. d677611 [Tathagata Das] Merge remote-tracking branch 'apache/master' into more-fixes bec3fc2 [Tathagata Das] Added license. 51d6514 [Tathagata Das] Fixed docs on receiver. 81aafa0 [Tathagata Das] Added Java test for Receiver API, and added JavaCustomReceiver example.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala90
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java144
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala3
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala1
7 files changed, 215 insertions, 31 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index fbb2e9f85d..75a3e9334e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -390,7 +390,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
* @param receiver Custom implementation of Receiver
*/
- def receiverStream[T](receiver: Receiver[T]): ReceiverInputDStream[T] = {
+ def receiverStream[T](receiver: Receiver[T]): JavaReceiverInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.receiverStream(receiver)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
index 524c1b8d8c..b310c22b3a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
@@ -30,28 +30,55 @@ import org.apache.spark.annotation.DeveloperApi
* Abstract class of a receiver that can be run on worker nodes to receive external data. A
* custom receiver can be defined by defining the functions onStart() and onStop(). onStart()
* should define the setup steps necessary to start receiving data,
- * and onStop() should define the cleanup steps necessary to stop receiving data. A custom
- * receiver would look something like this.
+ * and onStop() should define the cleanup steps necessary to stop receiving data.
*
- * @example {{{
+ * A custom receiver in Scala would look like this.
+ *
+ * {{{
* class MyReceiver(storageLevel: StorageLevel) extends NetworkReceiver[String](storageLevel) {
- * def onStart() {
- * // Setup stuff (start threads, open sockets, etc.) to start receiving data.
- * // Must start new thread to receive data, as onStart() must be non-blocking.
+ * def onStart() {
+ * // Setup stuff (start threads, open sockets, etc.) to start receiving data.
+ * // Must start new thread to receive data, as onStart() must be non-blocking.
*
- * // Call store(...) in those threads to store received data into Spark's memory.
+ * // Call store(...) in those threads to store received data into Spark's memory.
*
- * // Call stop(...), restart() or reportError(...) on any thread based on how
- * // different errors should be handled.
+ * // Call stop(...), restart(...) or reportError(...) on any thread based on how
+ * // different errors needs to be handled.
*
- * // See corresponding method documentation for more details
- * }
+ * // See corresponding method documentation for more details
+ * }
*
- * def onStop() {
- * // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data.
- * }
+ * def onStop() {
+ * // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data.
+ * }
* }
* }}}
+ *
+ * A custom receiver in Java would look like this.
+ *
+ * {{{
+ * class MyReceiver extends Receiver<String> {
+ * public MyReceiver(StorageLevel storageLevel) {
+ * super(storageLevel);
+ * }
+ *
+ * public void onStart() {
+ * // Setup stuff (start threads, open sockets, etc.) to start receiving data.
+ * // Must start new thread to receive data, as onStart() must be non-blocking.
+ *
+ * // Call store(...) in those threads to store received data into Spark's memory.
+ *
+ * // Call stop(...), restart(...) or reportError(...) on any thread based on how
+ * // different errors needs to be handled.
+ *
+ * // See corresponding method documentation for more details
+ * }
+ *
+ * public void onStop() {
+ * // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data.
+ * }
+ * }
+ * }}}
*/
@DeveloperApi
abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable {
@@ -156,30 +183,34 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
}
/**
- * Restart the receiver. This will call `onStop()` immediately and return.
- * Asynchronously, after a delay, `onStart()` will be called.
+ * Restart the receiver. This method schedules the restart and returns
+ * immediately. The stopping and subsequent starting of the receiver
+ * (by calling `onStop()` and `onStart()`) is performed asynchronously
+ * in a background thread. The delay between the stopping and the starting
+ * is defined by the Spark configuration `spark.streaming.receiverRestartDelay`.
* The `message` will be reported to the driver.
- * The delay is defined by the Spark configuration
- * `spark.streaming.receiverRestartDelay`.
*/
def restart(message: String) {
executor.restartReceiver(message)
}
/**
- * Restart the receiver. This will call `onStop()` immediately and return.
- * Asynchronously, after a delay, `onStart()` will be called.
+ * Restart the receiver. This method schedules the restart and returns
+ * immediately. The stopping and subsequent starting of the receiver
+ * (by calling `onStop()` and `onStart()`) is performed asynchronously
+ * in a background thread. The delay between the stopping and the starting
+ * is defined by the Spark configuration `spark.streaming.receiverRestartDelay`.
* The `message` and `exception` will be reported to the driver.
- * The delay is defined by the Spark configuration
- * `spark.streaming.receiverRestartDelay`.
*/
def restart(message: String, error: Throwable) {
executor.restartReceiver(message, Some(error))
}
/**
- * Restart the receiver. This will call `onStop()` immediately and return.
- * Asynchronously, after the given delay, `onStart()` will be called.
+ * Restart the receiver. This method schedules the restart and returns
+ * immediately. The stopping and subsequent starting of the receiver
+ * (by calling `onStop()` and `onStart()`) is performed asynchronously
+ * in a background thread.
*/
def restart(message: String, error: Throwable, millisecond: Int) {
executor.restartReceiver(message, Some(error), millisecond)
@@ -195,16 +226,23 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
executor.stop(message, Some(error))
}
+ /** Check if the receiver has started or not. */
def isStarted(): Boolean = {
executor.isReceiverStarted()
}
- /** Check if receiver has been marked for stopping. */
+ /**
+ * Check if receiver has been marked for stopping. Use this to identify when
+ * the receiving of data should be stopped.
+ */
def isStopped(): Boolean = {
executor.isReceiverStopped()
}
- /** Get unique identifier of this receiver. */
+ /**
+ * Get the unique identifier the receiver input stream that this
+ * receiver is associated with.
+ */
def streamId = id
/*
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
new file mode 100644
index 0000000000..1b0787fe69
--- /dev/null
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.receiver.Receiver;
+import org.apache.spark.api.java.function.Function;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.net.ConnectException;
+import java.net.Socket;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class JavaReceiverAPISuite implements Serializable {
+
+ @Before
+ public void setUp() {
+ System.clearProperty("spark.streaming.clock");
+ }
+
+ @After
+ public void tearDown() {
+ System.clearProperty("spark.streaming.clock");
+ }
+
+ @Test
+ public void testReceiver() throws InterruptedException {
+ TestServer server = new TestServer(0);
+ server.start();
+
+ final AtomicLong dataCounter = new AtomicLong(0);
+
+ try {
+ JavaStreamingContext ssc = new JavaStreamingContext("local[2]", "test", new Duration(200));
+ JavaReceiverInputDStream<String> input =
+ ssc.receiverStream(new JavaSocketReceiver("localhost", server.port()));
+ JavaDStream<String> mapped = input.map(new Function<String, String>() {
+ @Override
+ public String call(String v1) throws Exception {
+ return v1 + ".";
+ }
+ });
+ mapped.foreachRDD(new Function<JavaRDD<String>, Void>() {
+ @Override
+ public Void call(JavaRDD<String> rdd) throws Exception {
+ long count = rdd.count();
+ dataCounter.addAndGet(count);
+ return null;
+ }
+ });
+
+ ssc.start();
+ long startTime = System.currentTimeMillis();
+ long timeout = 10000;
+
+ Thread.sleep(200);
+ for (int i = 0; i < 6; i++) {
+ server.send("" + i + "\n"); // \n to make sure these are separate lines
+ Thread.sleep(100);
+ }
+ while (dataCounter.get() == 0 && System.currentTimeMillis() - startTime < timeout) {
+ Thread.sleep(100);
+ }
+ ssc.stop();
+ assertTrue(dataCounter.get() > 0);
+ } finally {
+ server.stop();
+ }
+ }
+}
+
+class JavaSocketReceiver extends Receiver<String> {
+
+ String host = null;
+ int port = -1;
+
+ public JavaSocketReceiver(String host_ , int port_) {
+ super(StorageLevel.MEMORY_AND_DISK());
+ host = host_;
+ port = port_;
+ }
+
+ @Override
+ public void onStart() {
+ new Thread() {
+ @Override public void run() {
+ receive();
+ }
+ }.start();
+ }
+
+ @Override
+ public void onStop() {
+ }
+
+ private void receive() {
+ Socket socket = null;
+ try {
+ socket = new Socket(host, port);
+ BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+ String userInput;
+ while ((userInput = in.readLine()) != null) {
+ store(userInput);
+ }
+ in.close();
+ socket.close();
+ } catch(ConnectException ce) {
+ ce.printStackTrace();
+ restart("Could not connect", ce);
+ } catch(Throwable t) {
+ t.printStackTrace();
+ restart("Error receiving data", t);
+ }
+ }
+}
+
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
index 33f6df8f88..c0ea0491c3 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
@@ -26,6 +26,7 @@ import org.apache.spark.streaming._
import java.util.ArrayList
import collection.JavaConversions._
import org.apache.spark.api.java.JavaRDDLike
+import org.apache.spark.streaming.dstream.DStream
/** Exposes streaming test functionality in a Java-friendly way. */
trait JavaTestBase extends TestSuiteBase {
@@ -51,8 +52,7 @@ trait JavaTestBase extends TestSuiteBase {
* [[org.apache.spark.streaming.TestOutputStream]].
**/
def attachTestOutputStream[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]](
- dstream: JavaDStreamLike[T, This, R]) =
- {
+ dstream: JavaDStreamLike[T, This, R]) = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val ostream = new TestOutputStreamWithPartitions(dstream.dstream)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index b55b7834c9..3fa254065c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -49,7 +49,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
- val networkStream = ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
+ val networkStream = ssc.socketTextStream(
+ "localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
val outputStream = new TestOutputStream(networkStream, outputBuffer)
def output = outputBuffer.flatMap(x => x)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 3e2b25af84..ee0bc8b7d6 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -165,7 +165,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
ssc = new StreamingContext(sc, Milliseconds(100))
var runningCount = 0
TestReceiver.counter.set(1)
- val input = ssc.networkStream(new TestReceiver)
+ val input = ssc.receiverStream(new TestReceiver)
input.count.foreachRDD(rdd => {
val count = rdd.first()
runningCount += count.toInt
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 4f63fd3782..8036f77c97 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -155,6 +155,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
def afterFunction() {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
+ System.clearProperty("spark.streaming.clock")
}
before(beforeFunction)