diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-04-28 13:58:09 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-04-28 13:58:09 -0700 |
commit | 1d84964bf80f4e69e54d62286c3861c2362342d0 (patch) | |
tree | 416f1576d794f1e02162fd94c784f3e9c014fa60 /streaming/src | |
parent | f735884414a15c0c07df60068ee11f9da47eff77 (diff) | |
download | spark-1d84964bf80f4e69e54d62286c3861c2362342d0.tar.gz spark-1d84964bf80f4e69e54d62286c3861c2362342d0.tar.bz2 spark-1d84964bf80f4e69e54d62286c3861c2362342d0.zip |
[SPARK-1633][Streaming] Java API unit test and example for custom streaming receiver in Java
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #558 from tdas/more-fixes and squashes the following commits:
c0c84e6 [Tathagata Das] Removing extra println()
d8a8cf4 [Tathagata Das] More tweaks to make unit test work in Jenkins.
b7caa98 [Tathagata Das] More tweaks.
d337367 [Tathagata Das] More tweaks
22d6f2d [Tathagata Das] Merge remote-tracking branch 'apache/master' into more-fixes
40a961b [Tathagata Das] Modified java test to reduce flakiness.
9410ca6 [Tathagata Das] Merge remote-tracking branch 'apache/master' into more-fixes
86d9147 [Tathagata Das] scala style fix
2f3d7b1 [Tathagata Das] Added Scala custom receiver example.
d677611 [Tathagata Das] Merge remote-tracking branch 'apache/master' into more-fixes
bec3fc2 [Tathagata Das] Added license.
51d6514 [Tathagata Das] Fixed docs on receiver.
81aafa0 [Tathagata Das] Added Java test for Receiver API, and added JavaCustomReceiver example.
Diffstat (limited to 'streaming/src')
7 files changed, 215 insertions, 31 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index fbb2e9f85d..75a3e9334e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -390,7 +390,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html * @param receiver Custom implementation of Receiver */ - def receiverStream[T](receiver: Receiver[T]): ReceiverInputDStream[T] = { + def receiverStream[T](receiver: Receiver[T]): JavaReceiverInputDStream[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] ssc.receiverStream(receiver) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala index 524c1b8d8c..b310c22b3a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala @@ -30,28 +30,55 @@ import org.apache.spark.annotation.DeveloperApi * Abstract class of a receiver that can be run on worker nodes to receive external data. A * custom receiver can be defined by defining the functions onStart() and onStop(). onStart() * should define the setup steps necessary to start receiving data, - * and onStop() should define the cleanup steps necessary to stop receiving data. A custom - * receiver would look something like this. + * and onStop() should define the cleanup steps necessary to stop receiving data. * - * @example {{{ + * A custom receiver in Scala would look like this. + * + * {{{ * class MyReceiver(storageLevel: StorageLevel) extends NetworkReceiver[String](storageLevel) { - * def onStart() { - * // Setup stuff (start threads, open sockets, etc.) to start receiving data. - * // Must start new thread to receive data, as onStart() must be non-blocking. + * def onStart() { + * // Setup stuff (start threads, open sockets, etc.) to start receiving data. + * // Must start new thread to receive data, as onStart() must be non-blocking. * - * // Call store(...) in those threads to store received data into Spark's memory. + * // Call store(...) in those threads to store received data into Spark's memory. * - * // Call stop(...), restart() or reportError(...) on any thread based on how - * // different errors should be handled. + * // Call stop(...), restart(...) or reportError(...) on any thread based on how + * // different errors needs to be handled. * - * // See corresponding method documentation for more details - * } + * // See corresponding method documentation for more details + * } * - * def onStop() { - * // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data. - * } + * def onStop() { + * // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data. + * } * } * }}} + * + * A custom receiver in Java would look like this. + * + * {{{ + * class MyReceiver extends Receiver<String> { + * public MyReceiver(StorageLevel storageLevel) { + * super(storageLevel); + * } + * + * public void onStart() { + * // Setup stuff (start threads, open sockets, etc.) to start receiving data. + * // Must start new thread to receive data, as onStart() must be non-blocking. + * + * // Call store(...) in those threads to store received data into Spark's memory. + * + * // Call stop(...), restart(...) or reportError(...) on any thread based on how + * // different errors needs to be handled. + * + * // See corresponding method documentation for more details + * } + * + * public void onStop() { + * // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data. + * } + * } + * }}} */ @DeveloperApi abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable { @@ -156,30 +183,34 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable } /** - * Restart the receiver. This will call `onStop()` immediately and return. - * Asynchronously, after a delay, `onStart()` will be called. + * Restart the receiver. This method schedules the restart and returns + * immediately. The stopping and subsequent starting of the receiver + * (by calling `onStop()` and `onStart()`) is performed asynchronously + * in a background thread. The delay between the stopping and the starting + * is defined by the Spark configuration `spark.streaming.receiverRestartDelay`. * The `message` will be reported to the driver. - * The delay is defined by the Spark configuration - * `spark.streaming.receiverRestartDelay`. */ def restart(message: String) { executor.restartReceiver(message) } /** - * Restart the receiver. This will call `onStop()` immediately and return. - * Asynchronously, after a delay, `onStart()` will be called. + * Restart the receiver. This method schedules the restart and returns + * immediately. The stopping and subsequent starting of the receiver + * (by calling `onStop()` and `onStart()`) is performed asynchronously + * in a background thread. The delay between the stopping and the starting + * is defined by the Spark configuration `spark.streaming.receiverRestartDelay`. * The `message` and `exception` will be reported to the driver. - * The delay is defined by the Spark configuration - * `spark.streaming.receiverRestartDelay`. */ def restart(message: String, error: Throwable) { executor.restartReceiver(message, Some(error)) } /** - * Restart the receiver. This will call `onStop()` immediately and return. - * Asynchronously, after the given delay, `onStart()` will be called. + * Restart the receiver. This method schedules the restart and returns + * immediately. The stopping and subsequent starting of the receiver + * (by calling `onStop()` and `onStart()`) is performed asynchronously + * in a background thread. */ def restart(message: String, error: Throwable, millisecond: Int) { executor.restartReceiver(message, Some(error), millisecond) @@ -195,16 +226,23 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable executor.stop(message, Some(error)) } + /** Check if the receiver has started or not. */ def isStarted(): Boolean = { executor.isReceiverStarted() } - /** Check if receiver has been marked for stopping. */ + /** + * Check if receiver has been marked for stopping. Use this to identify when + * the receiving of data should be stopped. + */ def isStopped(): Boolean = { executor.isReceiverStopped() } - /** Get unique identifier of this receiver. */ + /** + * Get the unique identifier the receiver input stream that this + * receiver is associated with. + */ def streamId = id /* diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java new file mode 100644 index 0000000000..1b0787fe69 --- /dev/null +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import static org.junit.Assert.*; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.receiver.Receiver; +import org.apache.spark.api.java.function.Function; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.Serializable; +import java.net.ConnectException; +import java.net.Socket; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +public class JavaReceiverAPISuite implements Serializable { + + @Before + public void setUp() { + System.clearProperty("spark.streaming.clock"); + } + + @After + public void tearDown() { + System.clearProperty("spark.streaming.clock"); + } + + @Test + public void testReceiver() throws InterruptedException { + TestServer server = new TestServer(0); + server.start(); + + final AtomicLong dataCounter = new AtomicLong(0); + + try { + JavaStreamingContext ssc = new JavaStreamingContext("local[2]", "test", new Duration(200)); + JavaReceiverInputDStream<String> input = + ssc.receiverStream(new JavaSocketReceiver("localhost", server.port())); + JavaDStream<String> mapped = input.map(new Function<String, String>() { + @Override + public String call(String v1) throws Exception { + return v1 + "."; + } + }); + mapped.foreachRDD(new Function<JavaRDD<String>, Void>() { + @Override + public Void call(JavaRDD<String> rdd) throws Exception { + long count = rdd.count(); + dataCounter.addAndGet(count); + return null; + } + }); + + ssc.start(); + long startTime = System.currentTimeMillis(); + long timeout = 10000; + + Thread.sleep(200); + for (int i = 0; i < 6; i++) { + server.send("" + i + "\n"); // \n to make sure these are separate lines + Thread.sleep(100); + } + while (dataCounter.get() == 0 && System.currentTimeMillis() - startTime < timeout) { + Thread.sleep(100); + } + ssc.stop(); + assertTrue(dataCounter.get() > 0); + } finally { + server.stop(); + } + } +} + +class JavaSocketReceiver extends Receiver<String> { + + String host = null; + int port = -1; + + public JavaSocketReceiver(String host_ , int port_) { + super(StorageLevel.MEMORY_AND_DISK()); + host = host_; + port = port_; + } + + @Override + public void onStart() { + new Thread() { + @Override public void run() { + receive(); + } + }.start(); + } + + @Override + public void onStop() { + } + + private void receive() { + Socket socket = null; + try { + socket = new Socket(host, port); + BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); + String userInput; + while ((userInput = in.readLine()) != null) { + store(userInput); + } + in.close(); + socket.close(); + } catch(ConnectException ce) { + ce.printStackTrace(); + restart("Could not connect", ce); + } catch(Throwable t) { + t.printStackTrace(); + restart("Error receiving data", t); + } + } +} + diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala index 33f6df8f88..c0ea0491c3 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala @@ -26,6 +26,7 @@ import org.apache.spark.streaming._ import java.util.ArrayList import collection.JavaConversions._ import org.apache.spark.api.java.JavaRDDLike +import org.apache.spark.streaming.dstream.DStream /** Exposes streaming test functionality in a Java-friendly way. */ trait JavaTestBase extends TestSuiteBase { @@ -51,8 +52,7 @@ trait JavaTestBase extends TestSuiteBase { * [[org.apache.spark.streaming.TestOutputStream]]. **/ def attachTestOutputStream[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]]( - dstream: JavaDStreamLike[T, This, R]) = - { + dstream: JavaDStreamLike[T, This, R]) = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val ostream = new TestOutputStreamWithPartitions(dstream.dstream) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index b55b7834c9..3fa254065c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -49,7 +49,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) - val networkStream = ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK) + val networkStream = ssc.socketTextStream( + "localhost", testServer.port, StorageLevel.MEMORY_AND_DISK) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] val outputStream = new TestOutputStream(networkStream, outputBuffer) def output = outputBuffer.flatMap(x => x) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 3e2b25af84..ee0bc8b7d6 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -165,7 +165,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ssc = new StreamingContext(sc, Milliseconds(100)) var runningCount = 0 TestReceiver.counter.set(1) - val input = ssc.networkStream(new TestReceiver) + val input = ssc.receiverStream(new TestReceiver) input.count.foreachRDD(rdd => { val count = rdd.first() runningCount += count.toInt diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 4f63fd3782..8036f77c97 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -155,6 +155,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { def afterFunction() { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.streaming.clock") } before(beforeFunction) |