aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-04-11 23:33:49 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-11 23:33:49 -0700
commit6aa08c39cf30fa5c4ed97f4fff16371b9030a2e6 (patch)
tree66542ff62c23f52366ea2e25e346467bff14c97b /streaming
parent165e06a74c3d75e6b7341c120943add8b035b96a (diff)
downloadspark-6aa08c39cf30fa5c4ed97f4fff16371b9030a2e6.tar.gz
spark-6aa08c39cf30fa5c4ed97f4fff16371b9030a2e6.tar.bz2
spark-6aa08c39cf30fa5c4ed97f4fff16371b9030a2e6.zip
[SPARK-1386] Web UI for Spark Streaming
When debugging Spark Streaming applications it is necessary to monitor certain metrics that are not shown in the Spark application UI. For example, what is average processing time of batches? What is the scheduling delay? Is the system able to process as fast as it is receiving data? How many records I am receiving through my receivers? While the StreamingListener interface introduced in the 0.9 provided some of this information, it could only be accessed programmatically. A UI that shows information specific to the streaming applications is necessary for easier debugging. This PR introduces such a UI. It shows various statistics related to the streaming application. Here is a screenshot of the UI running on my local machine. http://i.imgur.com/1ooDGhm.png This UI is integrated into the Spark UI running at 4040. Author: Tathagata Das <tathagata.das1565@gmail.com> Author: Andrew Or <andrewor14@gmail.com> Closes #290 from tdas/streaming-web-ui and squashes the following commits: fc73ca5 [Tathagata Das] Merge pull request #9 from andrewor14/ui-refactor 642dd88 [Andrew Or] Merge SparkUISuite.scala into UISuite.scala eb30517 [Andrew Or] Merge github.com:apache/spark into ui-refactor f4f4cbe [Tathagata Das] More minor fixes. 34bb364 [Tathagata Das] Merge branch 'streaming-web-ui' of github.com:tdas/spark into streaming-web-ui 252c566 [Tathagata Das] Merge pull request #8 from andrewor14/ui-refactor e038b4b [Tathagata Das] Addressed Patrick's comments. 125a054 [Andrew Or] Disable serving static resources with gzip 90feb8d [Andrew Or] Address Patrick's comments 89dae36 [Tathagata Das] Merge branch 'streaming-web-ui' of github.com:tdas/spark into streaming-web-ui 72fe256 [Tathagata Das] Merge pull request #6 from andrewor14/ui-refactor 2fc09c8 [Tathagata Das] Added binary check exclusions aa396d4 [Andrew Or] Rename tabs and pages (No more IndexPage.scala) f8e1053 [Tathagata Das] Added Spark and Streaming UI unit tests. caa5e05 [Tathagata Das] Merge branch 'streaming-web-ui' of github.com:tdas/spark into streaming-web-ui 585cd65 [Tathagata Das] Merge pull request #5 from andrewor14/ui-refactor 914b8ff [Tathagata Das] Moved utils functions to UIUtils. 548c98c [Andrew Or] Wide refactoring of WebUI, UITab, and UIPage (see commit message) 6de06b0 [Tathagata Das] Merge remote-tracking branch 'apache/master' into streaming-web-ui ee6543f [Tathagata Das] Minor changes based on Andrew's comments. fa760fe [Tathagata Das] Fixed long line. 1c0bcef [Tathagata Das] Refactored streaming UI into two files. 1af239b [Tathagata Das] Changed streaming UI to attach itself as a tab with the Spark UI. 827e81a [Tathagata Das] Merge branch 'streaming-web-ui' of github.com:tdas/spark into streaming-web-ui 168fe86 [Tathagata Das] Merge pull request #2 from andrewor14/ui-refactor 3e986f8 [Tathagata Das] Merge remote-tracking branch 'apache/master' into streaming-web-ui c78c92d [Andrew Or] Remove outdated comment 8f7323b [Andrew Or] End of file new lines, indentation, and imports (minor) 0d61ee8 [Andrew Or] Merge branch 'streaming-web-ui' of github.com:tdas/spark into ui-refactor 9a48fa1 [Andrew Or] Allow adding tabs to SparkUI dynamically + add example 61358e3 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into streaming-web-ui 53be2c5 [Tathagata Das] Minor style updates. ed25dfc [Andrew Or] Generalize SparkUI header to display tabs dynamically a37ad4f [Andrew Or] Comments, imports and formatting (minor) cd000b0 [Andrew Or] Merge github.com:apache/spark into ui-refactor 7d57444 [Andrew Or] Refactoring the UI interface to add flexibility aef4dd5 [Tathagata Das] Added Apache licenses. db27bad [Tathagata Das] Added last batch processing time to StreamingUI. 4d86e98 [Tathagata Das] Added basic stats to the StreamingUI and refactored the UI to a Page to make it easier to transition to using SparkUI later. 93f1c69 [Tathagata Das] Added network receiver information to the Streaming UI. 56cc7fb [Tathagata Das] First cut implementation of Streaming UI.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala23
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala9
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala79
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala1
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala9
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala11
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala7
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala86
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala18
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala148
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala180
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala36
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala1
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala46
16 files changed, 576 insertions, 88 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index a4e236c65f..ff5d0aaa3d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -17,29 +17,28 @@
package org.apache.spark.streaming
-import scala.collection.mutable.Queue
-import scala.collection.Map
-import scala.reflect.ClassTag
-
import java.io.InputStream
import java.util.concurrent.atomic.AtomicInteger
-import akka.actor.Props
-import akka.actor.SupervisorStrategy
-import org.apache.hadoop.io.LongWritable
-import org.apache.hadoop.io.Text
+import scala.collection.Map
+import scala.collection.mutable.Queue
+import scala.reflect.ClassTag
+
+import akka.actor.{Props, SupervisorStrategy}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
-import org.apache.hadoop.fs.Path
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.MetadataCleaner
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receivers._
import org.apache.spark.streaming.scheduler._
-import org.apache.hadoop.conf.Configuration
+import org.apache.spark.streaming.ui.StreamingTab
+import org.apache.spark.util.MetadataCleaner
/**
* Main entry point for Spark Streaming functionality. It provides methods used to create
@@ -158,6 +157,8 @@ class StreamingContext private[streaming] (
private[streaming] val waiter = new ContextWaiter
+ private[streaming] val uiTab = new StreamingTab(this)
+
/** Enumeration to identify current state of the StreamingContext */
private[streaming] object StreamingContextState extends Enumeration {
type CheckpointState = Value
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index d043200f71..a7e5215437 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -353,15 +353,6 @@ abstract class DStream[T: ClassTag] (
dependencies.foreach(_.clearMetadata(time))
}
- /* Adds metadata to the Stream while it is running.
- * This method should be overwritten by sublcasses of InputDStream.
- */
- private[streaming] def addMetadata(metadata: Any) {
- if (metadata != null) {
- logInfo("Dropping Metadata: " + metadata.toString)
- }
- }
-
/**
* Refresh the list of checkpointed RDDs that will be saved along with checkpoint of
* this stream. This is an internal method that should not be called directly. This is
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index d19a635fe8..5a249706b4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -17,24 +17,23 @@
package org.apache.spark.streaming.dstream
-import java.util.concurrent.{TimeUnit, ArrayBlockingQueue}
import java.nio.ByteBuffer
+import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.Await
-import scala.concurrent.duration._
import scala.reflect.ClassTag
-import akka.actor.{Props, Actor}
+import akka.actor.{Actor, Props}
import akka.pattern.ask
-import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}
-import org.apache.spark.streaming._
import org.apache.spark.{Logging, SparkEnv}
-import org.apache.spark.rdd.{RDD, BlockRDD}
+import org.apache.spark.rdd.{BlockRDD, RDD}
import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId}
-import org.apache.spark.streaming.scheduler.{DeregisterReceiver, AddBlocks, RegisterReceiver}
-import org.apache.spark.util.AkkaUtils
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.scheduler.{AddBlock, DeregisterReceiver, ReceivedBlockInfo, RegisterReceiver}
+import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}
+import org.apache.spark.util.{AkkaUtils, Utils}
/**
* Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
@@ -49,8 +48,10 @@ import org.apache.spark.util.AkkaUtils
abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingContext)
extends InputDStream[T](ssc_) {
- // This is an unique identifier that is used to match the network receiver with the
- // corresponding network input stream.
+ /** Keeps all received blocks information */
+ private lazy val receivedBlockInfo = new HashMap[Time, Array[ReceivedBlockInfo]]
+
+ /** This is an unique identifier for the network input stream. */
val id = ssc.getNewNetworkStreamId()
/**
@@ -65,25 +66,44 @@ abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingConte
def stop() {}
+ /** Ask NetworkInputTracker for received data blocks and generates RDDs with them. */
override def compute(validTime: Time): Option[RDD[T]] = {
// If this is called for any time before the start time of the context,
// then this returns an empty RDD. This may happen when recovering from a
// master failure
if (validTime >= graph.startTime) {
- val blockIds = ssc.scheduler.networkInputTracker.getBlocks(id, validTime)
+ val blockInfo = ssc.scheduler.networkInputTracker.getReceivedBlockInfo(id)
+ receivedBlockInfo(validTime) = blockInfo
+ val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
Some(new BlockRDD[T](ssc.sc, blockIds))
} else {
Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
}
}
+
+ /** Get information on received blocks. */
+ private[streaming] def getReceivedBlockInfo(time: Time) = {
+ receivedBlockInfo(time)
+ }
+
+ /**
+ * Clear metadata that are older than `rememberDuration` of this DStream.
+ * This is an internal method that should not be called directly. This
+ * implementation overrides the default implementation to clear received
+ * block information.
+ */
+ private[streaming] override def clearMetadata(time: Time) {
+ super.clearMetadata(time)
+ val oldReceivedBlocks = receivedBlockInfo.filter(_._1 <= (time - rememberDuration))
+ receivedBlockInfo --= oldReceivedBlocks.keys
+ logDebug("Cleared " + oldReceivedBlocks.size + " RDDs that were older than " +
+ (time - rememberDuration) + ": " + oldReceivedBlocks.keys.mkString(", "))
+ }
}
private[streaming] sealed trait NetworkReceiverMessage
-private[streaming] case class StopReceiver() extends NetworkReceiverMessage
-private[streaming] case class ReportBlock(blockId: BlockId, metadata: Any)
- extends NetworkReceiverMessage
-private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage
+private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage
/**
* Abstract class of a receiver that can be run on worker nodes to receive external data. See
@@ -177,6 +197,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
case (e, i) => "Exception " + i + ": " + e.getMessage + "\n" + e.getStackTraceString
}.mkString("\n")
}
+
logInfo("Deregistering receiver " + streamId)
val future = trackerActor.ask(DeregisterReceiver(streamId, message))(askTimeout)
Await.result(future, askTimeout)
@@ -209,18 +230,28 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
/**
* Push a block (as an ArrayBuffer filled with data) into the block manager.
*/
- def pushBlock(blockId: BlockId, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) {
+ def pushBlock(
+ blockId: StreamBlockId,
+ arrayBuffer: ArrayBuffer[T],
+ metadata: Any,
+ level: StorageLevel
+ ) {
env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level)
- trackerActor ! AddBlocks(streamId, Array(blockId), metadata)
+ trackerActor ! AddBlock(ReceivedBlockInfo(streamId, blockId, arrayBuffer.size, metadata))
logDebug("Pushed block " + blockId)
}
/**
* Push a block (as bytes) into the block manager.
*/
- def pushBlock(blockId: BlockId, bytes: ByteBuffer, metadata: Any, level: StorageLevel) {
+ def pushBlock(
+ blockId: StreamBlockId,
+ bytes: ByteBuffer,
+ metadata: Any,
+ level: StorageLevel
+ ) {
env.blockManager.putBytes(blockId, bytes, level)
- trackerActor ! AddBlocks(streamId, Array(blockId), metadata)
+ trackerActor ! AddBlock(ReceivedBlockInfo(streamId, blockId, -1, metadata))
}
/** Set the ID of the DStream that this receiver is associated with */
@@ -232,9 +263,11 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
private class NetworkReceiverActor extends Actor {
override def preStart() {
- logInfo("Registered receiver " + streamId)
- val future = trackerActor.ask(RegisterReceiver(streamId, self))(askTimeout)
+ val msg = RegisterReceiver(
+ streamId, NetworkReceiver.this.getClass.getSimpleName, Utils.localHostName(), self)
+ val future = trackerActor.ask(msg)(askTimeout)
Await.result(future, askTimeout)
+ logInfo("Registered receiver " + streamId)
}
override def receive() = {
@@ -253,7 +286,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
class BlockGenerator(storageLevel: StorageLevel)
extends Serializable with Logging {
- case class Block(id: BlockId, buffer: ArrayBuffer[T], metadata: Any = null)
+ case class Block(id: StreamBlockId, buffer: ArrayBuffer[T], metadata: Any = null)
val clock = new SystemClock()
val blockInterval = env.conf.getLong("spark.streaming.blockInterval", 200)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index 7f3cd2f8eb..9c69a2a4e2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -29,6 +29,7 @@ import org.apache.spark.streaming.Time
*/
case class BatchInfo(
batchTime: Time,
+ receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]],
submissionTime: Long,
processingStartTime: Option[Long],
processingEndTime: Option[Long]
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 92d885c4bc..e564eccba2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -201,7 +201,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " +
timesToReschedule.mkString(", "))
timesToReschedule.foreach(time =>
- jobScheduler.runJobs(time, graph.generateJobs(time))
+ jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time)))
)
// Restart the timer
@@ -214,7 +214,12 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
SparkEnv.set(ssc.env)
Try(graph.generateJobs(time)) match {
case Success(jobs) =>
- jobScheduler.runJobs(time, jobs)
+ val receivedBlockInfo = graph.getNetworkInputStreams.map { stream =>
+ val streamId = stream.id
+ val receivedBlockInfo = stream.getReceivedBlockInfo(time)
+ (streamId, receivedBlockInfo)
+ }.toMap
+ jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 04e0a6a283..d9ada99b47 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -100,14 +100,13 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
logInfo("Stopped JobScheduler")
}
- def runJobs(time: Time, jobs: Seq[Job]) {
- if (jobs.isEmpty) {
- logInfo("No jobs added for time " + time)
+ def submitJobSet(jobSet: JobSet) {
+ if (jobSet.jobs.isEmpty) {
+ logInfo("No jobs added for time " + jobSet.time)
} else {
- val jobSet = new JobSet(time, jobs)
- jobSets.put(time, jobSet)
+ jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
- logInfo("Added jobs for time " + time)
+ logInfo("Added jobs for time " + jobSet.time)
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
index fcf303aee6..a69d743621 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -24,7 +24,11 @@ import org.apache.spark.streaming.Time
* belong to the same batch.
*/
private[streaming]
-case class JobSet(time: Time, jobs: Seq[Job]) {
+case class JobSet(
+ time: Time,
+ jobs: Seq[Job],
+ receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]] = Map.empty
+ ) {
private val incompleteJobs = new HashSet[Job]()
private val submissionTime = System.currentTimeMillis() // when this jobset was submitted
@@ -60,6 +64,7 @@ case class JobSet(time: Time, jobs: Seq[Job]) {
def toBatchInfo: BatchInfo = {
new BatchInfo(
time,
+ receivedBlockInfo,
submissionTime,
if (processingStartTime >= 0 ) Some(processingStartTime) else None,
if (processingEndTime >= 0 ) Some(processingEndTime) else None
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
index 067e804202..a1e6f51768 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
@@ -17,20 +17,42 @@
package org.apache.spark.streaming.scheduler
-import scala.collection.mutable.{HashMap, Queue, SynchronizedMap}
+import scala.collection.mutable.{HashMap, SynchronizedMap, SynchronizedQueue}
import akka.actor._
+
import org.apache.spark.{Logging, SparkEnv, SparkException}
import org.apache.spark.SparkContext._
-import org.apache.spark.storage.BlockId
+import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.dstream.{NetworkReceiver, StopReceiver}
import org.apache.spark.util.AkkaUtils
+/** Information about receiver */
+case class ReceiverInfo(streamId: Int, typ: String, location: String) {
+ override def toString = s"$typ-$streamId"
+}
+
+/** Information about blocks received by the network receiver */
+case class ReceivedBlockInfo(
+ streamId: Int,
+ blockId: StreamBlockId,
+ numRecords: Long,
+ metadata: Any
+ )
+
+/**
+ * Messages used by the NetworkReceiver and the NetworkInputTracker to communicate
+ * with each other.
+ */
private[streaming] sealed trait NetworkInputTrackerMessage
-private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef)
- extends NetworkInputTrackerMessage
-private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any)
+private[streaming] case class RegisterReceiver(
+ streamId: Int,
+ typ: String,
+ host: String,
+ receiverActor: ActorRef
+ ) extends NetworkInputTrackerMessage
+private[streaming] case class AddBlock(receivedBlockInfo: ReceivedBlockInfo)
extends NetworkInputTrackerMessage
private[streaming] case class DeregisterReceiver(streamId: Int, msg: String)
extends NetworkInputTrackerMessage
@@ -47,9 +69,10 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
val networkInputStreamMap = Map(networkInputStreams.map(x => (x.id, x)): _*)
val receiverExecutor = new ReceiverExecutor()
val receiverInfo = new HashMap[Int, ActorRef] with SynchronizedMap[Int, ActorRef]
- val receivedBlockIds = new HashMap[Int, Queue[BlockId]] with SynchronizedMap[Int, Queue[BlockId]]
+ val receivedBlockInfo = new HashMap[Int, SynchronizedQueue[ReceivedBlockInfo]]
+ with SynchronizedMap[Int, SynchronizedQueue[ReceivedBlockInfo]]
val timeout = AkkaUtils.askTimeout(ssc.conf)
-
+ val listenerBus = ssc.scheduler.listenerBus
// actor is created when generator starts.
// This not being null means the tracker has been started and not stopped
@@ -83,12 +106,32 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
}
}
+ /** Return all the blocks received from a receiver. */
+ def getReceivedBlockInfo(streamId: Int): Array[ReceivedBlockInfo] = {
+ val receivedBlockInfo = getReceivedBlockInfoQueue(streamId).dequeueAll(x => true)
+ logInfo("Stream " + streamId + " received " + receivedBlockInfo.size + " blocks")
+ receivedBlockInfo.toArray
+ }
+
+ private def getReceivedBlockInfoQueue(streamId: Int) = {
+ receivedBlockInfo.getOrElseUpdate(streamId, new SynchronizedQueue[ReceivedBlockInfo])
+ }
+
/** Register a receiver */
- def registerReceiver(streamId: Int, receiverActor: ActorRef, sender: ActorRef) {
+ def registerReceiver(
+ streamId: Int,
+ typ: String,
+ host: String,
+ receiverActor: ActorRef,
+ sender: ActorRef
+ ) {
if (!networkInputStreamMap.contains(streamId)) {
throw new Exception("Register received for unexpected id " + streamId)
}
receiverInfo += ((streamId, receiverActor))
+ ssc.scheduler.listenerBus.post(StreamingListenerReceiverStarted(
+ ReceiverInfo(streamId, typ, host)
+ ))
logInfo("Registered receiver for network stream " + streamId + " from " + sender.path.address)
}
@@ -98,35 +141,26 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
logError("Deregistered receiver for network stream " + streamId + " with message:\n" + message)
}
- /** Get all the received blocks for the given stream. */
- def getBlocks(streamId: Int, time: Time): Array[BlockId] = {
- val queue = receivedBlockIds.getOrElseUpdate(streamId, new Queue[BlockId]())
- val result = queue.dequeueAll(x => true).toArray
- logInfo("Stream " + streamId + " received " + result.size + " blocks")
- result
- }
-
/** Add new blocks for the given stream */
- def addBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any) = {
- val queue = receivedBlockIds.getOrElseUpdate(streamId, new Queue[BlockId])
- queue ++= blockIds
- networkInputStreamMap(streamId).addMetadata(metadata)
- logDebug("Stream " + streamId + " received new blocks: " + blockIds.mkString("[", ", ", "]"))
+ def addBlocks(receivedBlockInfo: ReceivedBlockInfo) {
+ getReceivedBlockInfoQueue(receivedBlockInfo.streamId) += receivedBlockInfo
+ logDebug("Stream " + receivedBlockInfo.streamId + " received new blocks: " +
+ receivedBlockInfo.blockId)
}
/** Check if any blocks are left to be processed */
def hasMoreReceivedBlockIds: Boolean = {
- !receivedBlockIds.forall(_._2.isEmpty)
+ !receivedBlockInfo.values.forall(_.isEmpty)
}
/** Actor to receive messages from the receivers. */
private class NetworkInputTrackerActor extends Actor {
def receive = {
- case RegisterReceiver(streamId, receiverActor) =>
- registerReceiver(streamId, receiverActor, sender)
+ case RegisterReceiver(streamId, typ, host, receiverActor) =>
+ registerReceiver(streamId, typ, host, receiverActor, sender)
sender ! true
- case AddBlocks(streamId, blockIds, metadata) =>
- addBlocks(streamId, blockIds, metadata)
+ case AddBlock(receivedBlockInfo) =>
+ addBlocks(receivedBlockInfo)
case DeregisterReceiver(streamId, message) =>
deregisterReceiver(streamId, message)
sender ! true
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
index 461ea35064..5db40ebbeb 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
@@ -23,8 +23,11 @@ import org.apache.spark.util.Distribution
/** Base trait for events related to StreamingListener */
sealed trait StreamingListenerEvent
+case class StreamingListenerBatchSubmitted(batchInfo: BatchInfo) extends StreamingListenerEvent
case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent
case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent
+case class StreamingListenerReceiverStarted(receiverInfo: ReceiverInfo)
+ extends StreamingListenerEvent
/** An event used in the listener to shutdown the listener daemon thread. */
private[scheduler] case object StreamingListenerShutdown extends StreamingListenerEvent
@@ -34,14 +37,17 @@ private[scheduler] case object StreamingListenerShutdown extends StreamingListen
* computation.
*/
trait StreamingListener {
- /**
- * Called when processing of a batch has completed
- */
+
+ /** Called when a receiver has been started */
+ def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }
+
+ /** Called when a batch of jobs has been submitted for processing. */
+ def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }
+
+ /** Called when processing of a batch of jobs has completed. */
def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }
- /**
- * Called when processing of a batch has started
- */
+ /** Called when processing of a batch of jobs has started. */
def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
index 18811fc2b0..ea03dfc7bf 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@ -38,6 +38,10 @@ private[spark] class StreamingListenerBus() extends Logging {
while (true) {
val event = eventQueue.take
event match {
+ case receiverStarted: StreamingListenerReceiverStarted =>
+ listeners.foreach(_.onReceiverStarted(receiverStarted))
+ case batchSubmitted: StreamingListenerBatchSubmitted =>
+ listeners.foreach(_.onBatchSubmitted(batchSubmitted))
case batchStarted: StreamingListenerBatchStarted =>
listeners.foreach(_.onBatchStarted(batchStarted))
case batchCompleted: StreamingListenerBatchCompleted =>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
new file mode 100644
index 0000000000..8b025b09ed
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.ui
+
+import org.apache.spark.streaming.{Time, StreamingContext}
+import org.apache.spark.streaming.scheduler._
+import scala.collection.mutable.{Queue, HashMap}
+import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
+import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted
+import org.apache.spark.streaming.scheduler.BatchInfo
+import org.apache.spark.streaming.scheduler.ReceiverInfo
+import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted
+import org.apache.spark.util.Distribution
+
+
+private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends StreamingListener {
+
+ private val waitingBatchInfos = new HashMap[Time, BatchInfo]
+ private val runningBatchInfos = new HashMap[Time, BatchInfo]
+ private val completedaBatchInfos = new Queue[BatchInfo]
+ private val batchInfoLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
+ private var totalCompletedBatches = 0L
+ private val receiverInfos = new HashMap[Int, ReceiverInfo]
+
+ val batchDuration = ssc.graph.batchDuration.milliseconds
+
+ override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) = {
+ synchronized {
+ receiverInfos.put(receiverStarted.receiverInfo.streamId, receiverStarted.receiverInfo)
+ }
+ }
+
+ override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) = synchronized {
+ runningBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo
+ }
+
+ override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) = synchronized {
+ runningBatchInfos(batchStarted.batchInfo.batchTime) = batchStarted.batchInfo
+ waitingBatchInfos.remove(batchStarted.batchInfo.batchTime)
+ }
+
+ override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized {
+ waitingBatchInfos.remove(batchCompleted.batchInfo.batchTime)
+ runningBatchInfos.remove(batchCompleted.batchInfo.batchTime)
+ completedaBatchInfos.enqueue(batchCompleted.batchInfo)
+ if (completedaBatchInfos.size > batchInfoLimit) completedaBatchInfos.dequeue()
+ totalCompletedBatches += 1L
+ }
+
+ def numNetworkReceivers = synchronized {
+ ssc.graph.getNetworkInputStreams().size
+ }
+
+ def numTotalCompletedBatches: Long = synchronized {
+ totalCompletedBatches
+ }
+
+ def numUnprocessedBatches: Long = synchronized {
+ waitingBatchInfos.size + runningBatchInfos.size
+ }
+
+ def waitingBatches: Seq[BatchInfo] = synchronized {
+ waitingBatchInfos.values.toSeq
+ }
+
+ def runningBatches: Seq[BatchInfo] = synchronized {
+ runningBatchInfos.values.toSeq
+ }
+
+ def retainedCompletedBatches: Seq[BatchInfo] = synchronized {
+ completedaBatchInfos.toSeq
+ }
+
+ def processingDelayDistribution: Option[Distribution] = synchronized {
+ extractDistribution(_.processingDelay)
+ }
+
+ def schedulingDelayDistribution: Option[Distribution] = synchronized {
+ extractDistribution(_.schedulingDelay)
+ }
+
+ def totalDelayDistribution: Option[Distribution] = synchronized {
+ extractDistribution(_.totalDelay)
+ }
+
+ def receivedRecordsDistributions: Map[Int, Option[Distribution]] = synchronized {
+ val latestBatchInfos = retainedBatches.reverse.take(batchInfoLimit)
+ val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo)
+ (0 until numNetworkReceivers).map { receiverId =>
+ val blockInfoOfParticularReceiver = latestBlockInfos.map { batchInfo =>
+ batchInfo.get(receiverId).getOrElse(Array.empty)
+ }
+ val recordsOfParticularReceiver = blockInfoOfParticularReceiver.map { blockInfo =>
+ // calculate records per second for each batch
+ blockInfo.map(_.numRecords).sum.toDouble * 1000 / batchDuration
+ }
+ val distributionOption = Distribution(recordsOfParticularReceiver)
+ (receiverId, distributionOption)
+ }.toMap
+ }
+
+ def lastReceivedBatchRecords: Map[Int, Long] = {
+ val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receivedBlockInfo)
+ lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
+ (0 until numNetworkReceivers).map { receiverId =>
+ (receiverId, lastReceivedBlockInfo(receiverId).map(_.numRecords).sum)
+ }.toMap
+ }.getOrElse {
+ (0 until numNetworkReceivers).map(receiverId => (receiverId, 0L)).toMap
+ }
+ }
+
+ def receiverInfo(receiverId: Int): Option[ReceiverInfo] = {
+ receiverInfos.get(receiverId)
+ }
+
+ def lastCompletedBatch: Option[BatchInfo] = {
+ completedaBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption
+ }
+
+ def lastReceivedBatch: Option[BatchInfo] = {
+ retainedBatches.lastOption
+ }
+
+ private def retainedBatches: Seq[BatchInfo] = synchronized {
+ (waitingBatchInfos.values.toSeq ++
+ runningBatchInfos.values.toSeq ++ completedaBatchInfos).sortBy(_.batchTime)(Time.ordering)
+ }
+
+ private def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = {
+ Distribution(completedaBatchInfos.flatMap(getMetric(_)).map(_.toDouble))
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
new file mode 100644
index 0000000000..6607437db5
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.ui
+
+import java.util.Calendar
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.Logging
+import org.apache.spark.ui._
+import org.apache.spark.ui.UIUtils._
+import org.apache.spark.util.Distribution
+
+/** Page for Spark Web UI that shows statistics of a streaming job */
+private[ui] class StreamingPage(parent: StreamingTab)
+ extends WebUIPage("") with Logging {
+
+ private val listener = parent.listener
+ private val startTime = Calendar.getInstance().getTime()
+ private val emptyCell = "-"
+
+ /** Render the page */
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val content =
+ generateBasicStats() ++ <br></br> ++
+ <h4>Statistics over last {listener.retainedCompletedBatches.size} processed batches</h4> ++
+ generateNetworkStatsTable() ++
+ generateBatchStatsTable()
+ UIUtils.headerSparkPage(
+ content, parent.basePath, parent.appName, "Streaming", parent.headerTabs, parent, Some(5000))
+ }
+
+ /** Generate basic stats of the streaming program */
+ private def generateBasicStats(): Seq[Node] = {
+ val timeSinceStart = System.currentTimeMillis() - startTime.getTime
+ <ul class ="unstyled">
+ <li>
+ <strong>Started at: </strong> {startTime.toString}
+ </li>
+ <li>
+ <strong>Time since start: </strong>{formatDurationVerbose(timeSinceStart)}
+ </li>
+ <li>
+ <strong>Network receivers: </strong>{listener.numNetworkReceivers}
+ </li>
+ <li>
+ <strong>Batch interval: </strong>{formatDurationVerbose(listener.batchDuration)}
+ </li>
+ <li>
+ <strong>Processed batches: </strong>{listener.numTotalCompletedBatches}
+ </li>
+ <li>
+ <strong>Waiting batches: </strong>{listener.numUnprocessedBatches}
+ </li>
+ </ul>
+ }
+
+ /** Generate stats of data received over the network the streaming program */
+ private def generateNetworkStatsTable(): Seq[Node] = {
+ val receivedRecordDistributions = listener.receivedRecordsDistributions
+ val lastBatchReceivedRecord = listener.lastReceivedBatchRecords
+ val table = if (receivedRecordDistributions.size > 0) {
+ val headerRow = Seq(
+ "Receiver",
+ "Location",
+ "Records in last batch\n[" + formatDate(Calendar.getInstance().getTime()) + "]",
+ "Minimum rate\n[records/sec]",
+ "25th percentile rate\n[records/sec]",
+ "Median rate\n[records/sec]",
+ "75th percentile rate\n[records/sec]",
+ "Maximum rate\n[records/sec]"
+ )
+ val dataRows = (0 until listener.numNetworkReceivers).map { receiverId =>
+ val receiverInfo = listener.receiverInfo(receiverId)
+ val receiverName = receiverInfo.map(_.toString).getOrElse(s"Receiver-$receiverId")
+ val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCell)
+ val receiverLastBatchRecords = formatDurationVerbose(lastBatchReceivedRecord(receiverId))
+ val receivedRecordStats = receivedRecordDistributions(receiverId).map { d =>
+ d.getQuantiles().map(r => formatDurationVerbose(r.toLong))
+ }.getOrElse {
+ Seq(emptyCell, emptyCell, emptyCell, emptyCell, emptyCell)
+ }
+ Seq(receiverName, receiverLocation, receiverLastBatchRecords) ++ receivedRecordStats
+ }
+ Some(listingTable(headerRow, dataRows))
+ } else {
+ None
+ }
+
+ val content =
+ <h5>Network Input Statistics</h5> ++
+ <div>{table.getOrElse("No network receivers")}</div>
+
+ content
+ }
+
+ /** Generate stats of batch jobs of the streaming program */
+ private def generateBatchStatsTable(): Seq[Node] = {
+ val numBatches = listener.retainedCompletedBatches.size
+ val lastCompletedBatch = listener.lastCompletedBatch
+ val table = if (numBatches > 0) {
+ val processingDelayQuantilesRow = {
+ Seq(
+ "Processing Time",
+ formatDurationOption(lastCompletedBatch.flatMap(_.processingDelay))
+ ) ++ getQuantiles(listener.processingDelayDistribution)
+ }
+ val schedulingDelayQuantilesRow = {
+ Seq(
+ "Scheduling Delay",
+ formatDurationOption(lastCompletedBatch.flatMap(_.schedulingDelay))
+ ) ++ getQuantiles(listener.schedulingDelayDistribution)
+ }
+ val totalDelayQuantilesRow = {
+ Seq(
+ "Total Delay",
+ formatDurationOption(lastCompletedBatch.flatMap(_.totalDelay))
+ ) ++ getQuantiles(listener.totalDelayDistribution)
+ }
+ val headerRow = Seq("Metric", "Last batch", "Minimum", "25th percentile",
+ "Median", "75th percentile", "Maximum")
+ val dataRows: Seq[Seq[String]] = Seq(
+ processingDelayQuantilesRow,
+ schedulingDelayQuantilesRow,
+ totalDelayQuantilesRow
+ )
+ Some(listingTable(headerRow, dataRows))
+ } else {
+ None
+ }
+
+ val content =
+ <h5>Batch Processing Statistics</h5> ++
+ <div>
+ <ul class="unstyled">
+ {table.getOrElse("No statistics have been generated yet.")}
+ </ul>
+ </div>
+
+ content
+ }
+
+
+ /**
+ * Returns a human-readable string representing a duration such as "5 second 35 ms"
+ */
+ private def formatDurationOption(msOption: Option[Long]): String = {
+ msOption.map(formatDurationVerbose).getOrElse(emptyCell)
+ }
+
+ /** Get quantiles for any time distribution */
+ private def getQuantiles(timeDistributionOption: Option[Distribution]) = {
+ timeDistributionOption.get.getQuantiles().map { ms => formatDurationVerbose(ms.toLong) }
+ }
+
+ /** Generate HTML table from string data */
+ private def listingTable(headers: Seq[String], data: Seq[Seq[String]]) = {
+ def generateDataRow(data: Seq[String]): Seq[Node] = {
+ <tr> {data.map(d => <td>{d}</td>)} </tr>
+ }
+ UIUtils.listingTable(headers, generateDataRow, data, fixedWidth = true)
+ }
+}
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
new file mode 100644
index 0000000000..51448d15c6
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.ui
+
+import org.apache.spark.Logging
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.ui.WebUITab
+
+/** Spark Web UI tab that shows statistics of a streaming job */
+private[spark] class StreamingTab(ssc: StreamingContext)
+ extends WebUITab(ssc.sc.ui, "streaming") with Logging {
+
+ val parent = ssc.sc.ui
+ val appName = parent.appName
+ val basePath = parent.basePath
+ val listener = new StreamingJobProgressListener(ssc)
+
+ ssc.addStreamingListener(listener)
+ attachPage(new StreamingPage(this))
+ parent.attachTab(this)
+}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 389b23d4d5..952511d411 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -239,11 +239,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
/** This is a server to test the network input stream */
-class TestServer() extends Logging {
+class TestServer(portToBind: Int = 0) extends Logging {
val queue = new ArrayBlockingQueue[String](100)
- val serverSocket = new ServerSocket(0)
+ val serverSocket = new ServerSocket(portToBind)
val servingThread = new Thread() {
override def run() {
@@ -282,7 +282,7 @@ class TestServer() extends Logging {
def start() { servingThread.start() }
- def send(msg: String) { queue.add(msg) }
+ def send(msg: String) { queue.put(msg) }
def stop() { servingThread.interrupt() }
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 9cc27ef7f0..efd0d22ecb 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -161,7 +161,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
}
}
-
test("stop only streaming context") {
ssc = new StreamingContext(master, appName, batchDuration)
sc = ssc.sparkContext
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
new file mode 100644
index 0000000000..35538ec188
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.io.Source
+
+import org.scalatest.FunSuite
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
+
+class UISuite extends FunSuite {
+
+ test("streaming tab in spark UI") {
+ val ssc = new StreamingContext("local", "test", Seconds(1))
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ val html = Source.fromURL(ssc.sparkContext.ui.appUIAddress).mkString
+ assert(!html.contains("random data that should not be present"))
+ // test if streaming tab exist
+ assert(html.toLowerCase.contains("streaming"))
+ // test if other Spark tabs still exist
+ assert(html.toLowerCase.contains("stages"))
+ }
+
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ val html = Source.fromURL(
+ ssc.sparkContext.ui.appUIAddress.stripSuffix("/") + "/streaming").mkString
+ assert(html.toLowerCase.contains("batch"))
+ assert(html.toLowerCase.contains("network"))
+ }
+ }
+}