aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-04-28 13:58:09 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-04-28 13:58:09 -0700
commit1d84964bf80f4e69e54d62286c3861c2362342d0 (patch)
tree416f1576d794f1e02162fd94c784f3e9c014fa60 /streaming/src/main
parentf735884414a15c0c07df60068ee11f9da47eff77 (diff)
downloadspark-1d84964bf80f4e69e54d62286c3861c2362342d0.tar.gz
spark-1d84964bf80f4e69e54d62286c3861c2362342d0.tar.bz2
spark-1d84964bf80f4e69e54d62286c3861c2362342d0.zip
[SPARK-1633][Streaming] Java API unit test and example for custom streaming receiver in Java
Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #558 from tdas/more-fixes and squashes the following commits: c0c84e6 [Tathagata Das] Removing extra println() d8a8cf4 [Tathagata Das] More tweaks to make unit test work in Jenkins. b7caa98 [Tathagata Das] More tweaks. d337367 [Tathagata Das] More tweaks 22d6f2d [Tathagata Das] Merge remote-tracking branch 'apache/master' into more-fixes 40a961b [Tathagata Das] Modified java test to reduce flakiness. 9410ca6 [Tathagata Das] Merge remote-tracking branch 'apache/master' into more-fixes 86d9147 [Tathagata Das] scala style fix 2f3d7b1 [Tathagata Das] Added Scala custom receiver example. d677611 [Tathagata Das] Merge remote-tracking branch 'apache/master' into more-fixes bec3fc2 [Tathagata Das] Added license. 51d6514 [Tathagata Das] Fixed docs on receiver. 81aafa0 [Tathagata Das] Added Java test for Receiver API, and added JavaCustomReceiver example.
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala90
2 files changed, 65 insertions, 27 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index fbb2e9f85d..75a3e9334e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -390,7 +390,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
* @param receiver Custom implementation of Receiver
*/
- def receiverStream[T](receiver: Receiver[T]): ReceiverInputDStream[T] = {
+ def receiverStream[T](receiver: Receiver[T]): JavaReceiverInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.receiverStream(receiver)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
index 524c1b8d8c..b310c22b3a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
@@ -30,28 +30,55 @@ import org.apache.spark.annotation.DeveloperApi
* Abstract class of a receiver that can be run on worker nodes to receive external data. A
* custom receiver can be defined by defining the functions onStart() and onStop(). onStart()
* should define the setup steps necessary to start receiving data,
- * and onStop() should define the cleanup steps necessary to stop receiving data. A custom
- * receiver would look something like this.
+ * and onStop() should define the cleanup steps necessary to stop receiving data.
*
- * @example {{{
+ * A custom receiver in Scala would look like this.
+ *
+ * {{{
* class MyReceiver(storageLevel: StorageLevel) extends NetworkReceiver[String](storageLevel) {
- * def onStart() {
- * // Setup stuff (start threads, open sockets, etc.) to start receiving data.
- * // Must start new thread to receive data, as onStart() must be non-blocking.
+ * def onStart() {
+ * // Setup stuff (start threads, open sockets, etc.) to start receiving data.
+ * // Must start new thread to receive data, as onStart() must be non-blocking.
*
- * // Call store(...) in those threads to store received data into Spark's memory.
+ * // Call store(...) in those threads to store received data into Spark's memory.
*
- * // Call stop(...), restart() or reportError(...) on any thread based on how
- * // different errors should be handled.
+ * // Call stop(...), restart(...) or reportError(...) on any thread based on how
+ * // different errors needs to be handled.
*
- * // See corresponding method documentation for more details
- * }
+ * // See corresponding method documentation for more details
+ * }
*
- * def onStop() {
- * // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data.
- * }
+ * def onStop() {
+ * // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data.
+ * }
* }
* }}}
+ *
+ * A custom receiver in Java would look like this.
+ *
+ * {{{
+ * class MyReceiver extends Receiver<String> {
+ * public MyReceiver(StorageLevel storageLevel) {
+ * super(storageLevel);
+ * }
+ *
+ * public void onStart() {
+ * // Setup stuff (start threads, open sockets, etc.) to start receiving data.
+ * // Must start new thread to receive data, as onStart() must be non-blocking.
+ *
+ * // Call store(...) in those threads to store received data into Spark's memory.
+ *
+ * // Call stop(...), restart(...) or reportError(...) on any thread based on how
+ * // different errors needs to be handled.
+ *
+ * // See corresponding method documentation for more details
+ * }
+ *
+ * public void onStop() {
+ * // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data.
+ * }
+ * }
+ * }}}
*/
@DeveloperApi
abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable {
@@ -156,30 +183,34 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
}
/**
- * Restart the receiver. This will call `onStop()` immediately and return.
- * Asynchronously, after a delay, `onStart()` will be called.
+ * Restart the receiver. This method schedules the restart and returns
+ * immediately. The stopping and subsequent starting of the receiver
+ * (by calling `onStop()` and `onStart()`) is performed asynchronously
+ * in a background thread. The delay between the stopping and the starting
+ * is defined by the Spark configuration `spark.streaming.receiverRestartDelay`.
* The `message` will be reported to the driver.
- * The delay is defined by the Spark configuration
- * `spark.streaming.receiverRestartDelay`.
*/
def restart(message: String) {
executor.restartReceiver(message)
}
/**
- * Restart the receiver. This will call `onStop()` immediately and return.
- * Asynchronously, after a delay, `onStart()` will be called.
+ * Restart the receiver. This method schedules the restart and returns
+ * immediately. The stopping and subsequent starting of the receiver
+ * (by calling `onStop()` and `onStart()`) is performed asynchronously
+ * in a background thread. The delay between the stopping and the starting
+ * is defined by the Spark configuration `spark.streaming.receiverRestartDelay`.
* The `message` and `exception` will be reported to the driver.
- * The delay is defined by the Spark configuration
- * `spark.streaming.receiverRestartDelay`.
*/
def restart(message: String, error: Throwable) {
executor.restartReceiver(message, Some(error))
}
/**
- * Restart the receiver. This will call `onStop()` immediately and return.
- * Asynchronously, after the given delay, `onStart()` will be called.
+ * Restart the receiver. This method schedules the restart and returns
+ * immediately. The stopping and subsequent starting of the receiver
+ * (by calling `onStop()` and `onStart()`) is performed asynchronously
+ * in a background thread.
*/
def restart(message: String, error: Throwable, millisecond: Int) {
executor.restartReceiver(message, Some(error), millisecond)
@@ -195,16 +226,23 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
executor.stop(message, Some(error))
}
+ /** Check if the receiver has started or not. */
def isStarted(): Boolean = {
executor.isReceiverStarted()
}
- /** Check if receiver has been marked for stopping. */
+ /**
+ * Check if receiver has been marked for stopping. Use this to identify when
+ * the receiving of data should be stopped.
+ */
def isStopped(): Boolean = {
executor.isReceiverStopped()
}
- /** Get unique identifier of this receiver. */
+ /**
+ * Get the unique identifier the receiver input stream that this
+ * receiver is associated with.
+ */
def streamId = id
/*