From cd12dd9bde91303d0341180e5f70d2a03d6b65b6 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 24 Apr 2014 21:34:37 -0700 Subject: [SPARK-1617] and [SPARK-1618] Improvements to streaming ui and bug fix to socket receiver 1617: These changes expose the receiver state (active or inactive) and last error in the UI 1618: If the socket receiver cannot connect in the first attempt, it should try to restart after a delay. That was broken, as the thread that restarts (hence, stops) the receiver waited on Thread.join on itself! Author: Tathagata Das Closes #540 from tdas/streaming-ui-fix and squashes the following commits: e469434 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into streaming-ui-fix dbddf75 [Tathagata Das] Style fix. 66df1a5 [Tathagata Das] Merge remote-tracking branch 'apache/master' into streaming-ui-fix ad98bc9 [Tathagata Das] Refactored streaming listener to use ReceiverInfo. d7f849c [Tathagata Das] Revert "Moved BatchInfo from streaming.scheduler to streaming.ui" 5c80919 [Tathagata Das] Moved BatchInfo from streaming.scheduler to streaming.ui da244f6 [Tathagata Das] Fixed socket receiver as well as made receiver state and error visible in the streamign UI. --- .../streaming/dstream/SocketInputDStream.scala | 49 ++++++++---------- .../spark/streaming/receiver/ActorReceiver.scala | 12 ++++- .../apache/spark/streaming/receiver/Receiver.scala | 5 +- .../spark/streaming/receiver/ReceiverMessage.scala | 4 +- .../streaming/receiver/ReceiverSupervisor.scala | 58 +++++++++++++--------- .../receiver/ReceiverSupervisorImpl.scala | 24 +++++---- .../spark/streaming/scheduler/BatchInfo.scala | 3 ++ .../spark/streaming/scheduler/ReceiverInfo.scala | 37 ++++++++++++++ .../streaming/scheduler/ReceiverTracker.scala | 40 +++++++++------ .../streaming/scheduler/StreamingListener.scala | 25 ++++++++-- .../ui/StreamingJobProgressListener.scala | 18 +++++-- .../apache/spark/streaming/ui/StreamingPage.scala | 20 +++++--- .../spark/streaming/NetworkReceiverSuite.scala | 8 ++- .../spark/streaming/StreamingListenerSuite.scala | 15 +++--- 14 files changed, 216 insertions(+), 102 deletions(-) create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala index 1e32727eac..8b72bcf206 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala @@ -50,49 +50,42 @@ class SocketReceiver[T: ClassTag]( storageLevel: StorageLevel ) extends Receiver[T](storageLevel) with Logging { - var socket: Socket = null - var receivingThread: Thread = null - def onStart() { - receivingThread = new Thread("Socket Receiver") { - override def run() { - connect() - receive() - } - } - receivingThread.start() + // Start the thread that receives data over a connection + new Thread("Socket Receiver") { + setDaemon(true) + override def run() { receive() } + }.start() } def onStop() { - if (socket != null) { - socket.close() - } - socket = null - if (receivingThread != null) { - receivingThread.join() - } + // There is nothing much to do as the thread calling receive() + // is designed to stop by itself isStopped() returns false } - def connect() { + /** Create a socket connection and receive data until receiver is stopped */ + def receive() { + var socket: Socket = null try { logInfo("Connecting to " + host + ":" + port) socket = new Socket(host, port) - } catch { - case e: Exception => - restart("Could not connect to " + host + ":" + port, e) - } - } - - def receive() { - try { logInfo("Connected to " + host + ":" + port) val iterator = bytesToObjects(socket.getInputStream()) while(!isStopped && iterator.hasNext) { store(iterator.next) } + logInfo("Stopped receiving") + restart("Retrying connecting to " + host + ":" + port) } catch { - case e: Exception => - restart("Error receiving data from socket", e) + case e: java.net.ConnectException => + restart("Error connecting to " + host + ":" + port, e) + case t: Throwable => + restart("Error receiving data", t) + } finally { + if (socket != null) { + socket.close() + logInfo("Closed socket to " + host + ":" + port) + } } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala index 821cf19481..743be58950 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala @@ -28,8 +28,13 @@ import akka.actor.SupervisorStrategy.{Escalate, Restart} import org.apache.spark.{Logging, SparkEnv} import org.apache.spark.storage.StorageLevel import java.nio.ByteBuffer +import org.apache.spark.annotation.DeveloperApi -/** A helper with set of defaults for supervisor strategy */ +/** + * :: DeveloperApi :: + * A helper with set of defaults for supervisor strategy + */ +@DeveloperApi object ActorSupervisorStrategy { val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = @@ -40,6 +45,7 @@ object ActorSupervisorStrategy { } /** + * :: DeveloperApi :: * A receiver trait to be mixed in with your Actor to gain access to * the API for pushing received data into Spark Streaming for being processed. * @@ -61,6 +67,7 @@ object ActorSupervisorStrategy { * to ensure the type safety, i.e parametrized type of push block and InputDStream * should be same. */ +@DeveloperApi trait ActorHelper { self: Actor => // to ensure that this can be added to Actor classes only @@ -92,10 +99,12 @@ trait ActorHelper { } /** + * :: DeveloperApi :: * Statistics for querying the supervisor about state of workers. Used in * conjunction with `StreamingContext.actorStream` and * [[org.apache.spark.streaming.receiver.ActorHelper]]. */ +@DeveloperApi case class Statistics(numberOfMsgs: Int, numberOfWorkers: Int, numberOfHiccups: Int, @@ -188,4 +197,3 @@ private[streaming] class ActorReceiver[T: ClassTag]( supervisor ! PoisonPill } } - diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala index 44eecf1dd2..524c1b8d8c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala @@ -23,8 +23,10 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConversions._ import org.apache.spark.storage.StorageLevel +import org.apache.spark.annotation.DeveloperApi /** + * :: DeveloperApi :: * Abstract class of a receiver that can be run on worker nodes to receive external data. A * custom receiver can be defined by defining the functions onStart() and onStop(). onStart() * should define the setup steps necessary to start receiving data, @@ -51,6 +53,7 @@ import org.apache.spark.storage.StorageLevel * } * }}} */ +@DeveloperApi abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable { /** @@ -198,7 +201,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable /** Check if receiver has been marked for stopping. */ def isStopped(): Boolean = { - !executor.isReceiverStarted() + executor.isReceiverStopped() } /** Get unique identifier of this receiver. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala index 6ab3ca6ea5..bf39d1e891 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala @@ -18,6 +18,6 @@ package org.apache.spark.streaming.receiver /** Messages sent to the NetworkReceiver. */ -private[streaming] sealed trait NetworkReceiverMessage -private[streaming] object StopReceiver extends NetworkReceiverMessage +private[streaming] sealed trait ReceiverMessage +private[streaming] object StopReceiver extends ReceiverMessage diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala index 256b3335e4..09be3a50d2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala @@ -88,15 +88,29 @@ private[streaming] abstract class ReceiverSupervisor( /** Report errors. */ def reportError(message: String, throwable: Throwable) - /** Start the executor */ + /** Called when supervisor is started */ + protected def onStart() { } + + /** Called when supervisor is stopped */ + protected def onStop(message: String, error: Option[Throwable]) { } + + /** Called when receiver is started */ + protected def onReceiverStart() { } + + /** Called when receiver is stopped */ + protected def onReceiverStop(message: String, error: Option[Throwable]) { } + + /** Start the supervisor */ def start() { + onStart() startReceiver() } - /** Mark the executor and the receiver for stopping */ + /** Mark the supervisor and the receiver for stopping */ def stop(message: String, error: Option[Throwable]) { stoppingError = error.orNull stopReceiver(message, error) + onStop(message, error) stopLatch.countDown() } @@ -104,6 +118,8 @@ private[streaming] abstract class ReceiverSupervisor( def startReceiver(): Unit = synchronized { try { logInfo("Starting receiver") + receiver.onStart() + logInfo("Called receiver onStart") onReceiverStart() receiverState = Started } catch { @@ -115,7 +131,10 @@ private[streaming] abstract class ReceiverSupervisor( /** Stop receiver */ def stopReceiver(message: String, error: Option[Throwable]): Unit = synchronized { try { + logInfo("Stopping receiver with message: " + message + ": " + error.getOrElse("")) receiverState = Stopped + receiver.onStop() + logInfo("Called receiver onStop") onReceiverStop(message, error) } catch { case t: Throwable => @@ -130,41 +149,32 @@ private[streaming] abstract class ReceiverSupervisor( /** Restart receiver with delay */ def restartReceiver(message: String, error: Option[Throwable], delay: Int) { - logWarning("Restarting receiver with delay " + delay + " ms: " + message, - error.getOrElse(null)) - stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error) - future { + Future { + logWarning("Restarting receiver with delay " + delay + " ms: " + message, + error.getOrElse(null)) + stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error) logDebug("Sleeping for " + delay) Thread.sleep(delay) - logDebug("Starting receiver again") + logInfo("Starting receiver again") startReceiver() logInfo("Receiver started again") } } - /** Called when the receiver needs to be started */ - protected def onReceiverStart(): Unit = synchronized { - // Call user-defined onStart() - logInfo("Calling receiver onStart") - receiver.onStart() - logInfo("Called receiver onStart") - } - - /** Called when the receiver needs to be stopped */ - protected def onReceiverStop(message: String, error: Option[Throwable]): Unit = synchronized { - // Call user-defined onStop() - logInfo("Calling receiver onStop") - receiver.onStop() - logInfo("Called receiver onStop") - } - /** Check if receiver has been marked for stopping */ def isReceiverStarted() = { logDebug("state = " + receiverState) receiverState == Started } - /** Wait the thread until the executor is stopped */ + /** Check if receiver has been marked for stopping */ + def isReceiverStopped() = { + logDebug("state = " + receiverState) + receiverState == Stopped + } + + + /** Wait the thread until the supervisor is stopped */ def awaitTermination() { stopLatch.await() logInfo("Waiting for executor stop is over") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 2a3521bd46..ce8316bb14 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -79,6 +79,8 @@ private[streaming] class ReceiverSupervisorImpl( logInfo("Received stop signal") stop("Stopped by driver", None) } + + def ref = self }), "Receiver-" + streamId + "-" + System.currentTimeMillis()) /** Unique block ids if one wants to add blocks directly */ @@ -154,14 +156,23 @@ private[streaming] class ReceiverSupervisorImpl( logWarning("Reported error " + message + " - " + error) } - override def onReceiverStart() { + override protected def onStart() { blockGenerator.start() - super.onReceiverStart() } - override def onReceiverStop(message: String, error: Option[Throwable]) { - super.onReceiverStop(message, error) + override protected def onStop(message: String, error: Option[Throwable]) { blockGenerator.stop() + env.actorSystem.stop(actor) + } + + override protected def onReceiverStart() { + val msg = RegisterReceiver( + streamId, receiver.getClass.getSimpleName, Utils.localHostName(), actor) + val future = trackerActor.ask(msg)(askTimeout) + Await.result(future, askTimeout) + } + + override protected def onReceiverStop(message: String, error: Option[Throwable]) { logInfo("Deregistering receiver " + streamId) val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("") val future = trackerActor.ask( @@ -170,11 +181,6 @@ private[streaming] class ReceiverSupervisorImpl( logInfo("Stopped receiver " + streamId) } - override def stop(message: String, error: Option[Throwable]) { - super.stop(message, error) - env.actorSystem.stop(actor) - } - /** Generate new block ID */ private def nextBlockId = StreamBlockId(streamId, newBlockId.getAndIncrement) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala index 9c69a2a4e2..a68aecb881 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala @@ -18,8 +18,10 @@ package org.apache.spark.streaming.scheduler import org.apache.spark.streaming.Time +import org.apache.spark.annotation.DeveloperApi /** + * :: DeveloperApi :: * Class having information on completed batches. * @param batchTime Time of the batch * @param submissionTime Clock time of when jobs of this batch was submitted to @@ -27,6 +29,7 @@ import org.apache.spark.streaming.Time * @param processingStartTime Clock time of when the first job of this batch started processing * @param processingEndTime Clock time of when the last job of this batch finished processing */ +@DeveloperApi case class BatchInfo( batchTime: Time, receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]], diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala new file mode 100644 index 0000000000..d7e39c528c --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.scheduler + +import akka.actor.ActorRef +import org.apache.spark.annotation.DeveloperApi + +/** + * :: DeveloperApi :: + * Class having information about a receiver + */ +@DeveloperApi +case class ReceiverInfo( + streamId: Int, + name: String, + private[streaming] val actor: ActorRef, + active: Boolean, + location: String, + lastErrorMessage: String = "", + lastError: String = "" + ) { +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 557e0961d5..5307fe189d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -28,13 +28,8 @@ import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.streaming.receiver.{Receiver, ReceiverSupervisorImpl, StopReceiver} import org.apache.spark.util.AkkaUtils -/** Information about receiver */ -case class ReceiverInfo(streamId: Int, typ: String, location: String) { - override def toString = s"$typ-$streamId" -} - /** Information about blocks received by the receiver */ -case class ReceivedBlockInfo( +private[streaming] case class ReceivedBlockInfo( streamId: Int, blockId: StreamBlockId, numRecords: Long, @@ -69,7 +64,7 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging { val receiverInputStreams = ssc.graph.getReceiverInputStreams() val receiverInputStreamMap = Map(receiverInputStreams.map(x => (x.id, x)): _*) val receiverExecutor = new ReceiverLauncher() - val receiverInfo = new HashMap[Int, ActorRef] with SynchronizedMap[Int, ActorRef] + val receiverInfo = new HashMap[Int, ReceiverInfo] with SynchronizedMap[Int, ReceiverInfo] val receivedBlockInfo = new HashMap[Int, SynchronizedQueue[ReceivedBlockInfo]] with SynchronizedMap[Int, SynchronizedQueue[ReceivedBlockInfo]] val timeout = AkkaUtils.askTimeout(ssc.conf) @@ -129,17 +124,23 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging { if (!receiverInputStreamMap.contains(streamId)) { throw new Exception("Register received for unexpected id " + streamId) } - receiverInfo += ((streamId, receiverActor)) - ssc.scheduler.listenerBus.post(StreamingListenerReceiverStarted( - ReceiverInfo(streamId, typ, host) - )) + receiverInfo(streamId) = ReceiverInfo( + streamId, s"${typ}-${streamId}", receiverActor, true, host) + ssc.scheduler.listenerBus.post(StreamingListenerReceiverStarted(receiverInfo(streamId))) logInfo("Registered receiver for stream " + streamId + " from " + sender.path.address) } /** Deregister a receiver */ def deregisterReceiver(streamId: Int, message: String, error: String) { - receiverInfo -= streamId - ssc.scheduler.listenerBus.post(StreamingListenerReceiverStopped(streamId, message, error)) + val newReceiverInfo = receiverInfo.get(streamId) match { + case Some(oldInfo) => + oldInfo.copy(actor = null, active = false, lastErrorMessage = message, lastError = error) + case None => + logWarning("No prior receiver info") + ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error) + } + receiverInfo(streamId) = newReceiverInfo + ssc.scheduler.listenerBus.post(StreamingListenerReceiverStopped(receiverInfo(streamId))) val messageWithError = if (error != null && !error.isEmpty) { s"$message - $error" } else { @@ -157,7 +158,15 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging { /** Report error sent by a receiver */ def reportError(streamId: Int, message: String, error: String) { - ssc.scheduler.listenerBus.post(StreamingListenerReceiverError(streamId, message, error)) + val newReceiverInfo = receiverInfo.get(streamId) match { + case Some(oldInfo) => + oldInfo.copy(lastErrorMessage = message, lastError = error) + case None => + logWarning("No prior receiver info") + ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error) + } + receiverInfo(streamId) = newReceiverInfo + ssc.scheduler.listenerBus.post(StreamingListenerReceiverError(receiverInfo(streamId))) val messageWithError = if (error != null && !error.isEmpty) { s"$message - $error" } else { @@ -270,7 +279,8 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging { /** Stops the receivers. */ private def stopReceivers() { // Signal the receivers to stop - receiverInfo.values.foreach(_ ! StopReceiver) + receiverInfo.values.flatMap { info => Option(info.actor)} + .foreach { _ ! StopReceiver } logInfo("Sent stop signal to all " + receiverInfo.size + " receivers") } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala index 9d6ec1fa33..ed1aa114e1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala @@ -20,28 +20,45 @@ package org.apache.spark.streaming.scheduler import scala.collection.mutable.Queue import org.apache.spark.util.Distribution +import org.apache.spark.annotation.DeveloperApi -/** Base trait for events related to StreamingListener */ +/** + * :: DeveloperApi :: + * Base trait for events related to StreamingListener + */ +@DeveloperApi sealed trait StreamingListenerEvent +@DeveloperApi case class StreamingListenerBatchSubmitted(batchInfo: BatchInfo) extends StreamingListenerEvent + +@DeveloperApi case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent + +@DeveloperApi case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent +@DeveloperApi case class StreamingListenerReceiverStarted(receiverInfo: ReceiverInfo) extends StreamingListenerEvent -case class StreamingListenerReceiverError(streamId: Int, message: String, error: String) + +@DeveloperApi +case class StreamingListenerReceiverError(receiverInfo: ReceiverInfo) extends StreamingListenerEvent -case class StreamingListenerReceiverStopped(streamId: Int, message: String, error: String) + +@DeveloperApi +case class StreamingListenerReceiverStopped(receiverInfo: ReceiverInfo) extends StreamingListenerEvent /** An event used in the listener to shutdown the listener daemon thread. */ private[scheduler] case object StreamingListenerShutdown extends StreamingListenerEvent /** + * :: DeveloperApi :: * A listener interface for receiving information about an ongoing streaming * computation. */ +@DeveloperApi trait StreamingListener { /** Called when a receiver has been started */ @@ -65,9 +82,11 @@ trait StreamingListener { /** + * :: DeveloperApi :: * A simple StreamingListener that logs summary statistics across Spark Streaming batches * @param numBatchInfos Number of last batches to consider for generating statistics (default: 10) */ +@DeveloperApi class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener { // Queue containing latest completed batches val batchInfos = new Queue[BatchInfo]() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala index 14c33c728b..f61069b56d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala @@ -23,9 +23,9 @@ import scala.collection.mutable.{Queue, HashMap} import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted import org.apache.spark.streaming.scheduler.BatchInfo -import org.apache.spark.streaming.scheduler.ReceiverInfo import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted import org.apache.spark.util.Distribution +import org.apache.spark.Logging private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) @@ -40,9 +40,21 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) val batchDuration = ssc.graph.batchDuration.milliseconds - override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) = { + override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { synchronized { - receiverInfos.put(receiverStarted.receiverInfo.streamId, receiverStarted.receiverInfo) + receiverInfos(receiverStarted.receiverInfo.streamId) = receiverStarted.receiverInfo + } + } + + override def onReceiverError(receiverError: StreamingListenerReceiverError) { + synchronized { + receiverInfos(receiverError.receiverInfo.streamId) = receiverError.receiverInfo + } + } + + override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { + synchronized { + receiverInfos(receiverStopped.receiverInfo.streamId) = receiverStopped.receiverInfo } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index 8fe1219356..451b23e01c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -78,25 +78,33 @@ private[ui] class StreamingPage(parent: StreamingTab) val table = if (receivedRecordDistributions.size > 0) { val headerRow = Seq( "Receiver", + "Status", "Location", "Records in last batch\n[" + formatDate(Calendar.getInstance().getTime()) + "]", "Minimum rate\n[records/sec]", - "25th percentile rate\n[records/sec]", "Median rate\n[records/sec]", - "75th percentile rate\n[records/sec]", - "Maximum rate\n[records/sec]" + "Maximum rate\n[records/sec]", + "Last Error" ) val dataRows = (0 until listener.numReceivers).map { receiverId => val receiverInfo = listener.receiverInfo(receiverId) - val receiverName = receiverInfo.map(_.toString).getOrElse(s"Receiver-$receiverId") + val receiverName = receiverInfo.map(_.name).getOrElse(s"Receiver-$receiverId") + val receiverActive = receiverInfo.map { info => + if (info.active) "ACTIVE" else "INACTIVE" + }.getOrElse(emptyCell) val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCell) val receiverLastBatchRecords = formatNumber(lastBatchReceivedRecord(receiverId)) val receivedRecordStats = receivedRecordDistributions(receiverId).map { d => - d.getQuantiles().map(r => formatNumber(r.toLong)) + d.getQuantiles(Seq(0.0, 0.5, 1.0)).map(r => formatNumber(r.toLong)) }.getOrElse { Seq(emptyCell, emptyCell, emptyCell, emptyCell, emptyCell) } - Seq(receiverName, receiverLocation, receiverLastBatchRecords) ++ receivedRecordStats + val receiverLastError = listener.receiverInfo(receiverId).map { info => + val msg = s"${info.lastErrorMessage} - ${info.lastError}" + if (msg.size > 100) msg.take(97) + "..." else msg + }.getOrElse(emptyCell) + Seq(receiverName, receiverActive, receiverLocation, receiverLastBatchRecords) ++ + receivedRecordStats ++ Seq(receiverLastError) } Some(listingTable(headerRow, dataRows)) } else { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala index ff3619a590..303d149d28 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala @@ -94,9 +94,13 @@ class NetworkReceiverSuite extends FunSuite with Timeouts { // Verify restarting actually stops and starts the receiver receiver.restart("restarting", null, 100) - assert(receiver.isStopped) - assert(receiver.onStopCalled) + eventually(timeout(50 millis), interval(10 millis)) { + // receiver will be stopped async + assert(receiver.isStopped) + assert(receiver.onStopCalled) + } eventually(timeout(1000 millis), interval(100 millis)) { + // receiver will be started async assert(receiver.onStartCalled) assert(executor.isReceiverStarted) assert(receiver.isStarted) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 458dd3a2b1..ef0efa552c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -66,7 +66,7 @@ class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers { test("receiver info reporting") { val ssc = new StreamingContext("local[2]", "test", Milliseconds(1000)) - val inputStream = ssc.networkStream(new StreamingListenerSuiteReceiver) + val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver) inputStream.foreachRDD(_.count) val collector = new ReceiverInfoCollector @@ -75,8 +75,8 @@ class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers { ssc.start() try { eventually(timeout(1000 millis), interval(20 millis)) { - collector.startedReceiverInfo should have size 1 - collector.startedReceiverInfo(0).streamId should equal (0) + collector.startedReceiverStreamIds.size should be >= 1 + collector.startedReceiverStreamIds(0) should equal (0) collector.stoppedReceiverStreamIds should have size 1 collector.stoppedReceiverStreamIds(0) should equal (0) collector.receiverErrors should have size 1 @@ -108,20 +108,21 @@ class BatchInfoCollector extends StreamingListener { /** Listener that collects information on processed batches */ class ReceiverInfoCollector extends StreamingListener { - val startedReceiverInfo = new ArrayBuffer[ReceiverInfo] + val startedReceiverStreamIds = new ArrayBuffer[Int] val stoppedReceiverStreamIds = new ArrayBuffer[Int]() val receiverErrors = new ArrayBuffer[(Int, String, String)]() override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { - startedReceiverInfo += receiverStarted.receiverInfo + startedReceiverStreamIds += receiverStarted.receiverInfo.streamId } override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { - stoppedReceiverStreamIds += receiverStopped.streamId + stoppedReceiverStreamIds += receiverStopped.receiverInfo.streamId } override def onReceiverError(receiverError: StreamingListenerReceiverError) { - receiverErrors += ((receiverError.streamId, receiverError.message, receiverError.error)) + receiverErrors += ((receiverError.receiverInfo.streamId, + receiverError.receiverInfo.lastErrorMessage, receiverError.receiverInfo.lastError)) } } -- cgit v1.2.3