aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-04-24 21:34:37 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-04-24 21:34:37 -0700
commitcd12dd9bde91303d0341180e5f70d2a03d6b65b6 (patch)
tree5e9d6510edaa1222e09355c239925cc7559c817d /streaming
parent968c0187a12f5ae4a696c02c1ff088e998ed7edd (diff)
downloadspark-cd12dd9bde91303d0341180e5f70d2a03d6b65b6.tar.gz
spark-cd12dd9bde91303d0341180e5f70d2a03d6b65b6.tar.bz2
spark-cd12dd9bde91303d0341180e5f70d2a03d6b65b6.zip
[SPARK-1617] and [SPARK-1618] Improvements to streaming ui and bug fix to socket receiver
1617: These changes expose the receiver state (active or inactive) and last error in the UI 1618: If the socket receiver cannot connect in the first attempt, it should try to restart after a delay. That was broken, as the thread that restarts (hence, stops) the receiver waited on Thread.join on itself! Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #540 from tdas/streaming-ui-fix and squashes the following commits: e469434 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into streaming-ui-fix dbddf75 [Tathagata Das] Style fix. 66df1a5 [Tathagata Das] Merge remote-tracking branch 'apache/master' into streaming-ui-fix ad98bc9 [Tathagata Das] Refactored streaming listener to use ReceiverInfo. d7f849c [Tathagata Das] Revert "Moved BatchInfo from streaming.scheduler to streaming.ui" 5c80919 [Tathagata Das] Moved BatchInfo from streaming.scheduler to streaming.ui da244f6 [Tathagata Das] Fixed socket receiver as well as made receiver state and error visible in the streamign UI.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala49
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala12
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala58
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala24
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala37
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala40
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala25
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala18
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala20
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala8
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala15
14 files changed, 216 insertions, 102 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
index 1e32727eac..8b72bcf206 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
@@ -50,49 +50,42 @@ class SocketReceiver[T: ClassTag](
storageLevel: StorageLevel
) extends Receiver[T](storageLevel) with Logging {
- var socket: Socket = null
- var receivingThread: Thread = null
-
def onStart() {
- receivingThread = new Thread("Socket Receiver") {
- override def run() {
- connect()
- receive()
- }
- }
- receivingThread.start()
+ // Start the thread that receives data over a connection
+ new Thread("Socket Receiver") {
+ setDaemon(true)
+ override def run() { receive() }
+ }.start()
}
def onStop() {
- if (socket != null) {
- socket.close()
- }
- socket = null
- if (receivingThread != null) {
- receivingThread.join()
- }
+ // There is nothing much to do as the thread calling receive()
+ // is designed to stop by itself isStopped() returns false
}
- def connect() {
+ /** Create a socket connection and receive data until receiver is stopped */
+ def receive() {
+ var socket: Socket = null
try {
logInfo("Connecting to " + host + ":" + port)
socket = new Socket(host, port)
- } catch {
- case e: Exception =>
- restart("Could not connect to " + host + ":" + port, e)
- }
- }
-
- def receive() {
- try {
logInfo("Connected to " + host + ":" + port)
val iterator = bytesToObjects(socket.getInputStream())
while(!isStopped && iterator.hasNext) {
store(iterator.next)
}
+ logInfo("Stopped receiving")
+ restart("Retrying connecting to " + host + ":" + port)
} catch {
- case e: Exception =>
- restart("Error receiving data from socket", e)
+ case e: java.net.ConnectException =>
+ restart("Error connecting to " + host + ":" + port, e)
+ case t: Throwable =>
+ restart("Error receiving data", t)
+ } finally {
+ if (socket != null) {
+ socket.close()
+ logInfo("Closed socket to " + host + ":" + port)
+ }
}
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
index 821cf19481..743be58950 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
@@ -28,8 +28,13 @@ import akka.actor.SupervisorStrategy.{Escalate, Restart}
import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.storage.StorageLevel
import java.nio.ByteBuffer
+import org.apache.spark.annotation.DeveloperApi
-/** A helper with set of defaults for supervisor strategy */
+/**
+ * :: DeveloperApi ::
+ * A helper with set of defaults for supervisor strategy
+ */
+@DeveloperApi
object ActorSupervisorStrategy {
val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
@@ -40,6 +45,7 @@ object ActorSupervisorStrategy {
}
/**
+ * :: DeveloperApi ::
* A receiver trait to be mixed in with your Actor to gain access to
* the API for pushing received data into Spark Streaming for being processed.
*
@@ -61,6 +67,7 @@ object ActorSupervisorStrategy {
* to ensure the type safety, i.e parametrized type of push block and InputDStream
* should be same.
*/
+@DeveloperApi
trait ActorHelper {
self: Actor => // to ensure that this can be added to Actor classes only
@@ -92,10 +99,12 @@ trait ActorHelper {
}
/**
+ * :: DeveloperApi ::
* Statistics for querying the supervisor about state of workers. Used in
* conjunction with `StreamingContext.actorStream` and
* [[org.apache.spark.streaming.receiver.ActorHelper]].
*/
+@DeveloperApi
case class Statistics(numberOfMsgs: Int,
numberOfWorkers: Int,
numberOfHiccups: Int,
@@ -188,4 +197,3 @@ private[streaming] class ActorReceiver[T: ClassTag](
supervisor ! PoisonPill
}
}
-
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
index 44eecf1dd2..524c1b8d8c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
@@ -23,8 +23,10 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.annotation.DeveloperApi
/**
+ * :: DeveloperApi ::
* Abstract class of a receiver that can be run on worker nodes to receive external data. A
* custom receiver can be defined by defining the functions onStart() and onStop(). onStart()
* should define the setup steps necessary to start receiving data,
@@ -51,6 +53,7 @@ import org.apache.spark.storage.StorageLevel
* }
* }}}
*/
+@DeveloperApi
abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable {
/**
@@ -198,7 +201,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
/** Check if receiver has been marked for stopping. */
def isStopped(): Boolean = {
- !executor.isReceiverStarted()
+ executor.isReceiverStopped()
}
/** Get unique identifier of this receiver. */
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
index 6ab3ca6ea5..bf39d1e891 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
@@ -18,6 +18,6 @@
package org.apache.spark.streaming.receiver
/** Messages sent to the NetworkReceiver. */
-private[streaming] sealed trait NetworkReceiverMessage
-private[streaming] object StopReceiver extends NetworkReceiverMessage
+private[streaming] sealed trait ReceiverMessage
+private[streaming] object StopReceiver extends ReceiverMessage
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
index 256b3335e4..09be3a50d2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
@@ -88,15 +88,29 @@ private[streaming] abstract class ReceiverSupervisor(
/** Report errors. */
def reportError(message: String, throwable: Throwable)
- /** Start the executor */
+ /** Called when supervisor is started */
+ protected def onStart() { }
+
+ /** Called when supervisor is stopped */
+ protected def onStop(message: String, error: Option[Throwable]) { }
+
+ /** Called when receiver is started */
+ protected def onReceiverStart() { }
+
+ /** Called when receiver is stopped */
+ protected def onReceiverStop(message: String, error: Option[Throwable]) { }
+
+ /** Start the supervisor */
def start() {
+ onStart()
startReceiver()
}
- /** Mark the executor and the receiver for stopping */
+ /** Mark the supervisor and the receiver for stopping */
def stop(message: String, error: Option[Throwable]) {
stoppingError = error.orNull
stopReceiver(message, error)
+ onStop(message, error)
stopLatch.countDown()
}
@@ -104,6 +118,8 @@ private[streaming] abstract class ReceiverSupervisor(
def startReceiver(): Unit = synchronized {
try {
logInfo("Starting receiver")
+ receiver.onStart()
+ logInfo("Called receiver onStart")
onReceiverStart()
receiverState = Started
} catch {
@@ -115,7 +131,10 @@ private[streaming] abstract class ReceiverSupervisor(
/** Stop receiver */
def stopReceiver(message: String, error: Option[Throwable]): Unit = synchronized {
try {
+ logInfo("Stopping receiver with message: " + message + ": " + error.getOrElse(""))
receiverState = Stopped
+ receiver.onStop()
+ logInfo("Called receiver onStop")
onReceiverStop(message, error)
} catch {
case t: Throwable =>
@@ -130,41 +149,32 @@ private[streaming] abstract class ReceiverSupervisor(
/** Restart receiver with delay */
def restartReceiver(message: String, error: Option[Throwable], delay: Int) {
- logWarning("Restarting receiver with delay " + delay + " ms: " + message,
- error.getOrElse(null))
- stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error)
- future {
+ Future {
+ logWarning("Restarting receiver with delay " + delay + " ms: " + message,
+ error.getOrElse(null))
+ stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error)
logDebug("Sleeping for " + delay)
Thread.sleep(delay)
- logDebug("Starting receiver again")
+ logInfo("Starting receiver again")
startReceiver()
logInfo("Receiver started again")
}
}
- /** Called when the receiver needs to be started */
- protected def onReceiverStart(): Unit = synchronized {
- // Call user-defined onStart()
- logInfo("Calling receiver onStart")
- receiver.onStart()
- logInfo("Called receiver onStart")
- }
-
- /** Called when the receiver needs to be stopped */
- protected def onReceiverStop(message: String, error: Option[Throwable]): Unit = synchronized {
- // Call user-defined onStop()
- logInfo("Calling receiver onStop")
- receiver.onStop()
- logInfo("Called receiver onStop")
- }
-
/** Check if receiver has been marked for stopping */
def isReceiverStarted() = {
logDebug("state = " + receiverState)
receiverState == Started
}
- /** Wait the thread until the executor is stopped */
+ /** Check if receiver has been marked for stopping */
+ def isReceiverStopped() = {
+ logDebug("state = " + receiverState)
+ receiverState == Stopped
+ }
+
+
+ /** Wait the thread until the supervisor is stopped */
def awaitTermination() {
stopLatch.await()
logInfo("Waiting for executor stop is over")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 2a3521bd46..ce8316bb14 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -79,6 +79,8 @@ private[streaming] class ReceiverSupervisorImpl(
logInfo("Received stop signal")
stop("Stopped by driver", None)
}
+
+ def ref = self
}), "Receiver-" + streamId + "-" + System.currentTimeMillis())
/** Unique block ids if one wants to add blocks directly */
@@ -154,14 +156,23 @@ private[streaming] class ReceiverSupervisorImpl(
logWarning("Reported error " + message + " - " + error)
}
- override def onReceiverStart() {
+ override protected def onStart() {
blockGenerator.start()
- super.onReceiverStart()
}
- override def onReceiverStop(message: String, error: Option[Throwable]) {
- super.onReceiverStop(message, error)
+ override protected def onStop(message: String, error: Option[Throwable]) {
blockGenerator.stop()
+ env.actorSystem.stop(actor)
+ }
+
+ override protected def onReceiverStart() {
+ val msg = RegisterReceiver(
+ streamId, receiver.getClass.getSimpleName, Utils.localHostName(), actor)
+ val future = trackerActor.ask(msg)(askTimeout)
+ Await.result(future, askTimeout)
+ }
+
+ override protected def onReceiverStop(message: String, error: Option[Throwable]) {
logInfo("Deregistering receiver " + streamId)
val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("")
val future = trackerActor.ask(
@@ -170,11 +181,6 @@ private[streaming] class ReceiverSupervisorImpl(
logInfo("Stopped receiver " + streamId)
}
- override def stop(message: String, error: Option[Throwable]) {
- super.stop(message, error)
- env.actorSystem.stop(actor)
- }
-
/** Generate new block ID */
private def nextBlockId = StreamBlockId(streamId, newBlockId.getAndIncrement)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index 9c69a2a4e2..a68aecb881 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -18,8 +18,10 @@
package org.apache.spark.streaming.scheduler
import org.apache.spark.streaming.Time
+import org.apache.spark.annotation.DeveloperApi
/**
+ * :: DeveloperApi ::
* Class having information on completed batches.
* @param batchTime Time of the batch
* @param submissionTime Clock time of when jobs of this batch was submitted to
@@ -27,6 +29,7 @@ import org.apache.spark.streaming.Time
* @param processingStartTime Clock time of when the first job of this batch started processing
* @param processingEndTime Clock time of when the last job of this batch finished processing
*/
+@DeveloperApi
case class BatchInfo(
batchTime: Time,
receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]],
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
new file mode 100644
index 0000000000..d7e39c528c
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+import akka.actor.ActorRef
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * :: DeveloperApi ::
+ * Class having information about a receiver
+ */
+@DeveloperApi
+case class ReceiverInfo(
+ streamId: Int,
+ name: String,
+ private[streaming] val actor: ActorRef,
+ active: Boolean,
+ location: String,
+ lastErrorMessage: String = "",
+ lastError: String = ""
+ ) {
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 557e0961d5..5307fe189d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -28,13 +28,8 @@ import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.receiver.{Receiver, ReceiverSupervisorImpl, StopReceiver}
import org.apache.spark.util.AkkaUtils
-/** Information about receiver */
-case class ReceiverInfo(streamId: Int, typ: String, location: String) {
- override def toString = s"$typ-$streamId"
-}
-
/** Information about blocks received by the receiver */
-case class ReceivedBlockInfo(
+private[streaming] case class ReceivedBlockInfo(
streamId: Int,
blockId: StreamBlockId,
numRecords: Long,
@@ -69,7 +64,7 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
val receiverInputStreams = ssc.graph.getReceiverInputStreams()
val receiverInputStreamMap = Map(receiverInputStreams.map(x => (x.id, x)): _*)
val receiverExecutor = new ReceiverLauncher()
- val receiverInfo = new HashMap[Int, ActorRef] with SynchronizedMap[Int, ActorRef]
+ val receiverInfo = new HashMap[Int, ReceiverInfo] with SynchronizedMap[Int, ReceiverInfo]
val receivedBlockInfo = new HashMap[Int, SynchronizedQueue[ReceivedBlockInfo]]
with SynchronizedMap[Int, SynchronizedQueue[ReceivedBlockInfo]]
val timeout = AkkaUtils.askTimeout(ssc.conf)
@@ -129,17 +124,23 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
if (!receiverInputStreamMap.contains(streamId)) {
throw new Exception("Register received for unexpected id " + streamId)
}
- receiverInfo += ((streamId, receiverActor))
- ssc.scheduler.listenerBus.post(StreamingListenerReceiverStarted(
- ReceiverInfo(streamId, typ, host)
- ))
+ receiverInfo(streamId) = ReceiverInfo(
+ streamId, s"${typ}-${streamId}", receiverActor, true, host)
+ ssc.scheduler.listenerBus.post(StreamingListenerReceiverStarted(receiverInfo(streamId)))
logInfo("Registered receiver for stream " + streamId + " from " + sender.path.address)
}
/** Deregister a receiver */
def deregisterReceiver(streamId: Int, message: String, error: String) {
- receiverInfo -= streamId
- ssc.scheduler.listenerBus.post(StreamingListenerReceiverStopped(streamId, message, error))
+ val newReceiverInfo = receiverInfo.get(streamId) match {
+ case Some(oldInfo) =>
+ oldInfo.copy(actor = null, active = false, lastErrorMessage = message, lastError = error)
+ case None =>
+ logWarning("No prior receiver info")
+ ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error)
+ }
+ receiverInfo(streamId) = newReceiverInfo
+ ssc.scheduler.listenerBus.post(StreamingListenerReceiverStopped(receiverInfo(streamId)))
val messageWithError = if (error != null && !error.isEmpty) {
s"$message - $error"
} else {
@@ -157,7 +158,15 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
/** Report error sent by a receiver */
def reportError(streamId: Int, message: String, error: String) {
- ssc.scheduler.listenerBus.post(StreamingListenerReceiverError(streamId, message, error))
+ val newReceiverInfo = receiverInfo.get(streamId) match {
+ case Some(oldInfo) =>
+ oldInfo.copy(lastErrorMessage = message, lastError = error)
+ case None =>
+ logWarning("No prior receiver info")
+ ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error)
+ }
+ receiverInfo(streamId) = newReceiverInfo
+ ssc.scheduler.listenerBus.post(StreamingListenerReceiverError(receiverInfo(streamId)))
val messageWithError = if (error != null && !error.isEmpty) {
s"$message - $error"
} else {
@@ -270,7 +279,8 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
/** Stops the receivers. */
private def stopReceivers() {
// Signal the receivers to stop
- receiverInfo.values.foreach(_ ! StopReceiver)
+ receiverInfo.values.flatMap { info => Option(info.actor)}
+ .foreach { _ ! StopReceiver }
logInfo("Sent stop signal to all " + receiverInfo.size + " receivers")
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
index 9d6ec1fa33..ed1aa114e1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
@@ -20,28 +20,45 @@ package org.apache.spark.streaming.scheduler
import scala.collection.mutable.Queue
import org.apache.spark.util.Distribution
+import org.apache.spark.annotation.DeveloperApi
-/** Base trait for events related to StreamingListener */
+/**
+ * :: DeveloperApi ::
+ * Base trait for events related to StreamingListener
+ */
+@DeveloperApi
sealed trait StreamingListenerEvent
+@DeveloperApi
case class StreamingListenerBatchSubmitted(batchInfo: BatchInfo) extends StreamingListenerEvent
+
+@DeveloperApi
case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent
+
+@DeveloperApi
case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent
+@DeveloperApi
case class StreamingListenerReceiverStarted(receiverInfo: ReceiverInfo)
extends StreamingListenerEvent
-case class StreamingListenerReceiverError(streamId: Int, message: String, error: String)
+
+@DeveloperApi
+case class StreamingListenerReceiverError(receiverInfo: ReceiverInfo)
extends StreamingListenerEvent
-case class StreamingListenerReceiverStopped(streamId: Int, message: String, error: String)
+
+@DeveloperApi
+case class StreamingListenerReceiverStopped(receiverInfo: ReceiverInfo)
extends StreamingListenerEvent
/** An event used in the listener to shutdown the listener daemon thread. */
private[scheduler] case object StreamingListenerShutdown extends StreamingListenerEvent
/**
+ * :: DeveloperApi ::
* A listener interface for receiving information about an ongoing streaming
* computation.
*/
+@DeveloperApi
trait StreamingListener {
/** Called when a receiver has been started */
@@ -65,9 +82,11 @@ trait StreamingListener {
/**
+ * :: DeveloperApi ::
* A simple StreamingListener that logs summary statistics across Spark Streaming batches
* @param numBatchInfos Number of last batches to consider for generating statistics (default: 10)
*/
+@DeveloperApi
class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener {
// Queue containing latest completed batches
val batchInfos = new Queue[BatchInfo]()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 14c33c728b..f61069b56d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -23,9 +23,9 @@ import scala.collection.mutable.{Queue, HashMap}
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted
import org.apache.spark.streaming.scheduler.BatchInfo
-import org.apache.spark.streaming.scheduler.ReceiverInfo
import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted
import org.apache.spark.util.Distribution
+import org.apache.spark.Logging
private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
@@ -40,9 +40,21 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
val batchDuration = ssc.graph.batchDuration.milliseconds
- override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) = {
+ override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) {
synchronized {
- receiverInfos.put(receiverStarted.receiverInfo.streamId, receiverStarted.receiverInfo)
+ receiverInfos(receiverStarted.receiverInfo.streamId) = receiverStarted.receiverInfo
+ }
+ }
+
+ override def onReceiverError(receiverError: StreamingListenerReceiverError) {
+ synchronized {
+ receiverInfos(receiverError.receiverInfo.streamId) = receiverError.receiverInfo
+ }
+ }
+
+ override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) {
+ synchronized {
+ receiverInfos(receiverStopped.receiverInfo.streamId) = receiverStopped.receiverInfo
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index 8fe1219356..451b23e01c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -78,25 +78,33 @@ private[ui] class StreamingPage(parent: StreamingTab)
val table = if (receivedRecordDistributions.size > 0) {
val headerRow = Seq(
"Receiver",
+ "Status",
"Location",
"Records in last batch\n[" + formatDate(Calendar.getInstance().getTime()) + "]",
"Minimum rate\n[records/sec]",
- "25th percentile rate\n[records/sec]",
"Median rate\n[records/sec]",
- "75th percentile rate\n[records/sec]",
- "Maximum rate\n[records/sec]"
+ "Maximum rate\n[records/sec]",
+ "Last Error"
)
val dataRows = (0 until listener.numReceivers).map { receiverId =>
val receiverInfo = listener.receiverInfo(receiverId)
- val receiverName = receiverInfo.map(_.toString).getOrElse(s"Receiver-$receiverId")
+ val receiverName = receiverInfo.map(_.name).getOrElse(s"Receiver-$receiverId")
+ val receiverActive = receiverInfo.map { info =>
+ if (info.active) "ACTIVE" else "INACTIVE"
+ }.getOrElse(emptyCell)
val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCell)
val receiverLastBatchRecords = formatNumber(lastBatchReceivedRecord(receiverId))
val receivedRecordStats = receivedRecordDistributions(receiverId).map { d =>
- d.getQuantiles().map(r => formatNumber(r.toLong))
+ d.getQuantiles(Seq(0.0, 0.5, 1.0)).map(r => formatNumber(r.toLong))
}.getOrElse {
Seq(emptyCell, emptyCell, emptyCell, emptyCell, emptyCell)
}
- Seq(receiverName, receiverLocation, receiverLastBatchRecords) ++ receivedRecordStats
+ val receiverLastError = listener.receiverInfo(receiverId).map { info =>
+ val msg = s"${info.lastErrorMessage} - ${info.lastError}"
+ if (msg.size > 100) msg.take(97) + "..." else msg
+ }.getOrElse(emptyCell)
+ Seq(receiverName, receiverActive, receiverLocation, receiverLastBatchRecords) ++
+ receivedRecordStats ++ Seq(receiverLastError)
}
Some(listingTable(headerRow, dataRows))
} else {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
index ff3619a590..303d149d28 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
@@ -94,9 +94,13 @@ class NetworkReceiverSuite extends FunSuite with Timeouts {
// Verify restarting actually stops and starts the receiver
receiver.restart("restarting", null, 100)
- assert(receiver.isStopped)
- assert(receiver.onStopCalled)
+ eventually(timeout(50 millis), interval(10 millis)) {
+ // receiver will be stopped async
+ assert(receiver.isStopped)
+ assert(receiver.onStopCalled)
+ }
eventually(timeout(1000 millis), interval(100 millis)) {
+ // receiver will be started async
assert(receiver.onStartCalled)
assert(executor.isReceiverStarted)
assert(receiver.isStarted)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 458dd3a2b1..ef0efa552c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -66,7 +66,7 @@ class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers {
test("receiver info reporting") {
val ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
- val inputStream = ssc.networkStream(new StreamingListenerSuiteReceiver)
+ val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
inputStream.foreachRDD(_.count)
val collector = new ReceiverInfoCollector
@@ -75,8 +75,8 @@ class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers {
ssc.start()
try {
eventually(timeout(1000 millis), interval(20 millis)) {
- collector.startedReceiverInfo should have size 1
- collector.startedReceiverInfo(0).streamId should equal (0)
+ collector.startedReceiverStreamIds.size should be >= 1
+ collector.startedReceiverStreamIds(0) should equal (0)
collector.stoppedReceiverStreamIds should have size 1
collector.stoppedReceiverStreamIds(0) should equal (0)
collector.receiverErrors should have size 1
@@ -108,20 +108,21 @@ class BatchInfoCollector extends StreamingListener {
/** Listener that collects information on processed batches */
class ReceiverInfoCollector extends StreamingListener {
- val startedReceiverInfo = new ArrayBuffer[ReceiverInfo]
+ val startedReceiverStreamIds = new ArrayBuffer[Int]
val stoppedReceiverStreamIds = new ArrayBuffer[Int]()
val receiverErrors = new ArrayBuffer[(Int, String, String)]()
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) {
- startedReceiverInfo += receiverStarted.receiverInfo
+ startedReceiverStreamIds += receiverStarted.receiverInfo.streamId
}
override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) {
- stoppedReceiverStreamIds += receiverStopped.streamId
+ stoppedReceiverStreamIds += receiverStopped.receiverInfo.streamId
}
override def onReceiverError(receiverError: StreamingListenerReceiverError) {
- receiverErrors += ((receiverError.streamId, receiverError.message, receiverError.error))
+ receiverErrors += ((receiverError.receiverInfo.streamId,
+ receiverError.receiverInfo.lastErrorMessage, receiverError.receiverInfo.lastError))
}
}