aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala23
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala9
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala79
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala1
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala9
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala11
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala7
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala86
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala18
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala148
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala180
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala36
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala1
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala46
16 files changed, 576 insertions, 88 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index a4e236c65f..ff5d0aaa3d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -17,29 +17,28 @@
package org.apache.spark.streaming
-import scala.collection.mutable.Queue
-import scala.collection.Map
-import scala.reflect.ClassTag
-
import java.io.InputStream
import java.util.concurrent.atomic.AtomicInteger
-import akka.actor.Props
-import akka.actor.SupervisorStrategy
-import org.apache.hadoop.io.LongWritable
-import org.apache.hadoop.io.Text
+import scala.collection.Map
+import scala.collection.mutable.Queue
+import scala.reflect.ClassTag
+
+import akka.actor.{Props, SupervisorStrategy}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
-import org.apache.hadoop.fs.Path
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.MetadataCleaner
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receivers._
import org.apache.spark.streaming.scheduler._
-import org.apache.hadoop.conf.Configuration
+import org.apache.spark.streaming.ui.StreamingTab
+import org.apache.spark.util.MetadataCleaner
/**
* Main entry point for Spark Streaming functionality. It provides methods used to create
@@ -158,6 +157,8 @@ class StreamingContext private[streaming] (
private[streaming] val waiter = new ContextWaiter
+ private[streaming] val uiTab = new StreamingTab(this)
+
/** Enumeration to identify current state of the StreamingContext */
private[streaming] object StreamingContextState extends Enumeration {
type CheckpointState = Value
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index d043200f71..a7e5215437 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -353,15 +353,6 @@ abstract class DStream[T: ClassTag] (
dependencies.foreach(_.clearMetadata(time))
}
- /* Adds metadata to the Stream while it is running.
- * This method should be overwritten by sublcasses of InputDStream.
- */
- private[streaming] def addMetadata(metadata: Any) {
- if (metadata != null) {
- logInfo("Dropping Metadata: " + metadata.toString)
- }
- }
-
/**
* Refresh the list of checkpointed RDDs that will be saved along with checkpoint of
* this stream. This is an internal method that should not be called directly. This is
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index d19a635fe8..5a249706b4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -17,24 +17,23 @@
package org.apache.spark.streaming.dstream
-import java.util.concurrent.{TimeUnit, ArrayBlockingQueue}
import java.nio.ByteBuffer
+import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.Await
-import scala.concurrent.duration._
import scala.reflect.ClassTag
-import akka.actor.{Props, Actor}
+import akka.actor.{Actor, Props}
import akka.pattern.ask
-import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}
-import org.apache.spark.streaming._
import org.apache.spark.{Logging, SparkEnv}
-import org.apache.spark.rdd.{RDD, BlockRDD}
+import org.apache.spark.rdd.{BlockRDD, RDD}
import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId}
-import org.apache.spark.streaming.scheduler.{DeregisterReceiver, AddBlocks, RegisterReceiver}
-import org.apache.spark.util.AkkaUtils
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.scheduler.{AddBlock, DeregisterReceiver, ReceivedBlockInfo, RegisterReceiver}
+import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}
+import org.apache.spark.util.{AkkaUtils, Utils}
/**
* Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
@@ -49,8 +48,10 @@ import org.apache.spark.util.AkkaUtils
abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingContext)
extends InputDStream[T](ssc_) {
- // This is an unique identifier that is used to match the network receiver with the
- // corresponding network input stream.
+ /** Keeps all received blocks information */
+ private lazy val receivedBlockInfo = new HashMap[Time, Array[ReceivedBlockInfo]]
+
+ /** This is an unique identifier for the network input stream. */
val id = ssc.getNewNetworkStreamId()
/**
@@ -65,25 +66,44 @@ abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingConte
def stop() {}
+ /** Ask NetworkInputTracker for received data blocks and generates RDDs with them. */
override def compute(validTime: Time): Option[RDD[T]] = {
// If this is called for any time before the start time of the context,
// then this returns an empty RDD. This may happen when recovering from a
// master failure
if (validTime >= graph.startTime) {
- val blockIds = ssc.scheduler.networkInputTracker.getBlocks(id, validTime)
+ val blockInfo = ssc.scheduler.networkInputTracker.getReceivedBlockInfo(id)
+ receivedBlockInfo(validTime) = blockInfo
+ val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
Some(new BlockRDD[T](ssc.sc, blockIds))
} else {
Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
}
}
+
+ /** Get information on received blocks. */
+ private[streaming] def getReceivedBlockInfo(time: Time) = {
+ receivedBlockInfo(time)
+ }
+
+ /**
+ * Clear metadata that are older than `rememberDuration` of this DStream.
+ * This is an internal method that should not be called directly. This
+ * implementation overrides the default implementation to clear received
+ * block information.
+ */
+ private[streaming] override def clearMetadata(time: Time) {
+ super.clearMetadata(time)
+ val oldReceivedBlocks = receivedBlockInfo.filter(_._1 <= (time - rememberDuration))
+ receivedBlockInfo --= oldReceivedBlocks.keys
+ logDebug("Cleared " + oldReceivedBlocks.size + " RDDs that were older than " +
+ (time - rememberDuration) + ": " + oldReceivedBlocks.keys.mkString(", "))
+ }
}
private[streaming] sealed trait NetworkReceiverMessage
-private[streaming] case class StopReceiver() extends NetworkReceiverMessage
-private[streaming] case class ReportBlock(blockId: BlockId, metadata: Any)
- extends NetworkReceiverMessage
-private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage
+private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage
/**
* Abstract class of a receiver that can be run on worker nodes to receive external data. See
@@ -177,6 +197,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
case (e, i) => "Exception " + i + ": " + e.getMessage + "\n" + e.getStackTraceString
}.mkString("\n")
}
+
logInfo("Deregistering receiver " + streamId)
val future = trackerActor.ask(DeregisterReceiver(streamId, message))(askTimeout)
Await.result(future, askTimeout)
@@ -209,18 +230,28 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
/**
* Push a block (as an ArrayBuffer filled with data) into the block manager.
*/
- def pushBlock(blockId: BlockId, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) {
+ def pushBlock(
+ blockId: StreamBlockId,
+ arrayBuffer: ArrayBuffer[T],
+ metadata: Any,
+ level: StorageLevel
+ ) {
env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level)
- trackerActor ! AddBlocks(streamId, Array(blockId), metadata)
+ trackerActor ! AddBlock(ReceivedBlockInfo(streamId, blockId, arrayBuffer.size, metadata))
logDebug("Pushed block " + blockId)
}
/**
* Push a block (as bytes) into the block manager.
*/
- def pushBlock(blockId: BlockId, bytes: ByteBuffer, metadata: Any, level: StorageLevel) {
+ def pushBlock(
+ blockId: StreamBlockId,
+ bytes: ByteBuffer,
+ metadata: Any,
+ level: StorageLevel
+ ) {
env.blockManager.putBytes(blockId, bytes, level)
- trackerActor ! AddBlocks(streamId, Array(blockId), metadata)
+ trackerActor ! AddBlock(ReceivedBlockInfo(streamId, blockId, -1, metadata))
}
/** Set the ID of the DStream that this receiver is associated with */
@@ -232,9 +263,11 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
private class NetworkReceiverActor extends Actor {
override def preStart() {
- logInfo("Registered receiver " + streamId)
- val future = trackerActor.ask(RegisterReceiver(streamId, self))(askTimeout)
+ val msg = RegisterReceiver(
+ streamId, NetworkReceiver.this.getClass.getSimpleName, Utils.localHostName(), self)
+ val future = trackerActor.ask(msg)(askTimeout)
Await.result(future, askTimeout)
+ logInfo("Registered receiver " + streamId)
}
override def receive() = {
@@ -253,7 +286,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
class BlockGenerator(storageLevel: StorageLevel)
extends Serializable with Logging {
- case class Block(id: BlockId, buffer: ArrayBuffer[T], metadata: Any = null)
+ case class Block(id: StreamBlockId, buffer: ArrayBuffer[T], metadata: Any = null)
val clock = new SystemClock()
val blockInterval = env.conf.getLong("spark.streaming.blockInterval", 200)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index 7f3cd2f8eb..9c69a2a4e2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -29,6 +29,7 @@ import org.apache.spark.streaming.Time
*/
case class BatchInfo(
batchTime: Time,
+ receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]],
submissionTime: Long,
processingStartTime: Option[Long],
processingEndTime: Option[Long]
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 92d885c4bc..e564eccba2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -201,7 +201,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " +
timesToReschedule.mkString(", "))
timesToReschedule.foreach(time =>
- jobScheduler.runJobs(time, graph.generateJobs(time))
+ jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time)))
)
// Restart the timer
@@ -214,7 +214,12 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
SparkEnv.set(ssc.env)
Try(graph.generateJobs(time)) match {
case Success(jobs) =>
- jobScheduler.runJobs(time, jobs)
+ val receivedBlockInfo = graph.getNetworkInputStreams.map { stream =>
+ val streamId = stream.id
+ val receivedBlockInfo = stream.getReceivedBlockInfo(time)
+ (streamId, receivedBlockInfo)
+ }.toMap
+ jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 04e0a6a283..d9ada99b47 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -100,14 +100,13 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
logInfo("Stopped JobScheduler")
}
- def runJobs(time: Time, jobs: Seq[Job]) {
- if (jobs.isEmpty) {
- logInfo("No jobs added for time " + time)
+ def submitJobSet(jobSet: JobSet) {
+ if (jobSet.jobs.isEmpty) {
+ logInfo("No jobs added for time " + jobSet.time)
} else {
- val jobSet = new JobSet(time, jobs)
- jobSets.put(time, jobSet)
+ jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
- logInfo("Added jobs for time " + time)
+ logInfo("Added jobs for time " + jobSet.time)
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
index fcf303aee6..a69d743621 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -24,7 +24,11 @@ import org.apache.spark.streaming.Time
* belong to the same batch.
*/
private[streaming]
-case class JobSet(time: Time, jobs: Seq[Job]) {
+case class JobSet(
+ time: Time,
+ jobs: Seq[Job],
+ receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]] = Map.empty
+ ) {
private val incompleteJobs = new HashSet[Job]()
private val submissionTime = System.currentTimeMillis() // when this jobset was submitted
@@ -60,6 +64,7 @@ case class JobSet(time: Time, jobs: Seq[Job]) {
def toBatchInfo: BatchInfo = {
new BatchInfo(
time,
+ receivedBlockInfo,
submissionTime,
if (processingStartTime >= 0 ) Some(processingStartTime) else None,
if (processingEndTime >= 0 ) Some(processingEndTime) else None
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
index 067e804202..a1e6f51768 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
@@ -17,20 +17,42 @@
package org.apache.spark.streaming.scheduler
-import scala.collection.mutable.{HashMap, Queue, SynchronizedMap}
+import scala.collection.mutable.{HashMap, SynchronizedMap, SynchronizedQueue}
import akka.actor._
+
import org.apache.spark.{Logging, SparkEnv, SparkException}
import org.apache.spark.SparkContext._
-import org.apache.spark.storage.BlockId
+import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.dstream.{NetworkReceiver, StopReceiver}
import org.apache.spark.util.AkkaUtils
+/** Information about receiver */
+case class ReceiverInfo(streamId: Int, typ: String, location: String) {
+ override def toString = s"$typ-$streamId"
+}
+
+/** Information about blocks received by the network receiver */
+case class ReceivedBlockInfo(
+ streamId: Int,
+ blockId: StreamBlockId,
+ numRecords: Long,
+ metadata: Any
+ )
+
+/**
+ * Messages used by the NetworkReceiver and the NetworkInputTracker to communicate
+ * with each other.
+ */
private[streaming] sealed trait NetworkInputTrackerMessage
-private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef)
- extends NetworkInputTrackerMessage
-private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any)
+private[streaming] case class RegisterReceiver(
+ streamId: Int,
+ typ: String,
+ host: String,
+ receiverActor: ActorRef
+ ) extends NetworkInputTrackerMessage
+private[streaming] case class AddBlock(receivedBlockInfo: ReceivedBlockInfo)
extends NetworkInputTrackerMessage
private[streaming] case class DeregisterReceiver(streamId: Int, msg: String)
extends NetworkInputTrackerMessage
@@ -47,9 +69,10 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
val networkInputStreamMap = Map(networkInputStreams.map(x => (x.id, x)): _*)
val receiverExecutor = new ReceiverExecutor()
val receiverInfo = new HashMap[Int, ActorRef] with SynchronizedMap[Int, ActorRef]
- val receivedBlockIds = new HashMap[Int, Queue[BlockId]] with SynchronizedMap[Int, Queue[BlockId]]
+ val receivedBlockInfo = new HashMap[Int, SynchronizedQueue[ReceivedBlockInfo]]
+ with SynchronizedMap[Int, SynchronizedQueue[ReceivedBlockInfo]]
val timeout = AkkaUtils.askTimeout(ssc.conf)
-
+ val listenerBus = ssc.scheduler.listenerBus
// actor is created when generator starts.
// This not being null means the tracker has been started and not stopped
@@ -83,12 +106,32 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
}
}
+ /** Return all the blocks received from a receiver. */
+ def getReceivedBlockInfo(streamId: Int): Array[ReceivedBlockInfo] = {
+ val receivedBlockInfo = getReceivedBlockInfoQueue(streamId).dequeueAll(x => true)
+ logInfo("Stream " + streamId + " received " + receivedBlockInfo.size + " blocks")
+ receivedBlockInfo.toArray
+ }
+
+ private def getReceivedBlockInfoQueue(streamId: Int) = {
+ receivedBlockInfo.getOrElseUpdate(streamId, new SynchronizedQueue[ReceivedBlockInfo])
+ }
+
/** Register a receiver */
- def registerReceiver(streamId: Int, receiverActor: ActorRef, sender: ActorRef) {
+ def registerReceiver(
+ streamId: Int,
+ typ: String,
+ host: String,
+ receiverActor: ActorRef,
+ sender: ActorRef
+ ) {
if (!networkInputStreamMap.contains(streamId)) {
throw new Exception("Register received for unexpected id " + streamId)
}
receiverInfo += ((streamId, receiverActor))
+ ssc.scheduler.listenerBus.post(StreamingListenerReceiverStarted(
+ ReceiverInfo(streamId, typ, host)
+ ))
logInfo("Registered receiver for network stream " + streamId + " from " + sender.path.address)
}
@@ -98,35 +141,26 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
logError("Deregistered receiver for network stream " + streamId + " with message:\n" + message)
}
- /** Get all the received blocks for the given stream. */
- def getBlocks(streamId: Int, time: Time): Array[BlockId] = {
- val queue = receivedBlockIds.getOrElseUpdate(streamId, new Queue[BlockId]())
- val result = queue.dequeueAll(x => true).toArray
- logInfo("Stream " + streamId + " received " + result.size + " blocks")
- result
- }
-
/** Add new blocks for the given stream */
- def addBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any) = {
- val queue = receivedBlockIds.getOrElseUpdate(streamId, new Queue[BlockId])
- queue ++= blockIds
- networkInputStreamMap(streamId).addMetadata(metadata)
- logDebug("Stream " + streamId + " received new blocks: " + blockIds.mkString("[", ", ", "]"))
+ def addBlocks(receivedBlockInfo: ReceivedBlockInfo) {
+ getReceivedBlockInfoQueue(receivedBlockInfo.streamId) += receivedBlockInfo
+ logDebug("Stream " + receivedBlockInfo.streamId + " received new blocks: " +
+ receivedBlockInfo.blockId)
}
/** Check if any blocks are left to be processed */
def hasMoreReceivedBlockIds: Boolean = {
- !receivedBlockIds.forall(_._2.isEmpty)
+ !receivedBlockInfo.values.forall(_.isEmpty)
}
/** Actor to receive messages from the receivers. */
private class NetworkInputTrackerActor extends Actor {
def receive = {
- case RegisterReceiver(streamId, receiverActor) =>
- registerReceiver(streamId, receiverActor, sender)
+ case RegisterReceiver(streamId, typ, host, receiverActor) =>
+ registerReceiver(streamId, typ, host, receiverActor, sender)
sender ! true
- case AddBlocks(streamId, blockIds, metadata) =>
- addBlocks(streamId, blockIds, metadata)
+ case AddBlock(receivedBlockInfo) =>
+ addBlocks(receivedBlockInfo)
case DeregisterReceiver(streamId, message) =>
deregisterReceiver(streamId, message)
sender ! true
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
index 461ea35064..5db40ebbeb 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
@@ -23,8 +23,11 @@ import org.apache.spark.util.Distribution
/** Base trait for events related to StreamingListener */
sealed trait StreamingListenerEvent
+case class StreamingListenerBatchSubmitted(batchInfo: BatchInfo) extends StreamingListenerEvent
case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent
case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent
+case class StreamingListenerReceiverStarted(receiverInfo: ReceiverInfo)
+ extends StreamingListenerEvent
/** An event used in the listener to shutdown the listener daemon thread. */
private[scheduler] case object StreamingListenerShutdown extends StreamingListenerEvent
@@ -34,14 +37,17 @@ private[scheduler] case object StreamingListenerShutdown extends StreamingListen
* computation.
*/
trait StreamingListener {
- /**
- * Called when processing of a batch has completed
- */
+
+ /** Called when a receiver has been started */
+ def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }
+
+ /** Called when a batch of jobs has been submitted for processing. */
+ def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }
+
+ /** Called when processing of a batch of jobs has completed. */
def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }
- /**
- * Called when processing of a batch has started
- */
+ /** Called when processing of a batch of jobs has started. */
def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
index 18811fc2b0..ea03dfc7bf 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@ -38,6 +38,10 @@ private[spark] class StreamingListenerBus() extends Logging {
while (true) {
val event = eventQueue.take
event match {
+ case receiverStarted: StreamingListenerReceiverStarted =>
+ listeners.foreach(_.onReceiverStarted(receiverStarted))
+ case batchSubmitted: StreamingListenerBatchSubmitted =>
+ listeners.foreach(_.onBatchSubmitted(batchSubmitted))
case batchStarted: StreamingListenerBatchStarted =>
listeners.foreach(_.onBatchStarted(batchStarted))
case batchCompleted: StreamingListenerBatchCompleted =>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
new file mode 100644
index 0000000000..8b025b09ed
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.ui
+
+import org.apache.spark.streaming.{Time, StreamingContext}
+import org.apache.spark.streaming.scheduler._
+import scala.collection.mutable.{Queue, HashMap}
+import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
+import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted
+import org.apache.spark.streaming.scheduler.BatchInfo
+import org.apache.spark.streaming.scheduler.ReceiverInfo
+import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted
+import org.apache.spark.util.Distribution
+
+
+private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends StreamingListener {
+
+ private val waitingBatchInfos = new HashMap[Time, BatchInfo]
+ private val runningBatchInfos = new HashMap[Time, BatchInfo]
+ private val completedaBatchInfos = new Queue[BatchInfo]
+ private val batchInfoLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
+ private var totalCompletedBatches = 0L
+ private val receiverInfos = new HashMap[Int, ReceiverInfo]
+
+ val batchDuration = ssc.graph.batchDuration.milliseconds
+
+ override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) = {
+ synchronized {
+ receiverInfos.put(receiverStarted.receiverInfo.streamId, receiverStarted.receiverInfo)
+ }
+ }
+
+ override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) = synchronized {
+ runningBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo
+ }
+
+ override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) = synchronized {
+ runningBatchInfos(batchStarted.batchInfo.batchTime) = batchStarted.batchInfo
+ waitingBatchInfos.remove(batchStarted.batchInfo.batchTime)
+ }
+
+ override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized {
+ waitingBatchInfos.remove(batchCompleted.batchInfo.batchTime)
+ runningBatchInfos.remove(batchCompleted.batchInfo.batchTime)
+ completedaBatchInfos.enqueue(batchCompleted.batchInfo)
+ if (completedaBatchInfos.size > batchInfoLimit) completedaBatchInfos.dequeue()
+ totalCompletedBatches += 1L
+ }
+
+ def numNetworkReceivers = synchronized {
+ ssc.graph.getNetworkInputStreams().size
+ }
+
+ def numTotalCompletedBatches: Long = synchronized {
+ totalCompletedBatches
+ }
+
+ def numUnprocessedBatches: Long = synchronized {
+ waitingBatchInfos.size + runningBatchInfos.size
+ }
+
+ def waitingBatches: Seq[BatchInfo] = synchronized {
+ waitingBatchInfos.values.toSeq
+ }
+
+ def runningBatches: Seq[BatchInfo] = synchronized {
+ runningBatchInfos.values.toSeq
+ }
+
+ def retainedCompletedBatches: Seq[BatchInfo] = synchronized {
+ completedaBatchInfos.toSeq
+ }
+
+ def processingDelayDistribution: Option[Distribution] = synchronized {
+ extractDistribution(_.processingDelay)
+ }
+
+ def schedulingDelayDistribution: Option[Distribution] = synchronized {
+ extractDistribution(_.schedulingDelay)
+ }
+
+ def totalDelayDistribution: Option[Distribution] = synchronized {
+ extractDistribution(_.totalDelay)
+ }
+
+ def receivedRecordsDistributions: Map[Int, Option[Distribution]] = synchronized {
+ val latestBatchInfos = retainedBatches.reverse.take(batchInfoLimit)
+ val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo)
+ (0 until numNetworkReceivers).map { receiverId =>
+ val blockInfoOfParticularReceiver = latestBlockInfos.map { batchInfo =>
+ batchInfo.get(receiverId).getOrElse(Array.empty)
+ }
+ val recordsOfParticularReceiver = blockInfoOfParticularReceiver.map { blockInfo =>
+ // calculate records per second for each batch
+ blockInfo.map(_.numRecords).sum.toDouble * 1000 / batchDuration
+ }
+ val distributionOption = Distribution(recordsOfParticularReceiver)
+ (receiverId, distributionOption)
+ }.toMap
+ }
+
+ def lastReceivedBatchRecords: Map[Int, Long] = {
+ val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receivedBlockInfo)
+ lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
+ (0 until numNetworkReceivers).map { receiverId =>
+ (receiverId, lastReceivedBlockInfo(receiverId).map(_.numRecords).sum)
+ }.toMap
+ }.getOrElse {
+ (0 until numNetworkReceivers).map(receiverId => (receiverId, 0L)).toMap
+ }
+ }
+
+ def receiverInfo(receiverId: Int): Option[ReceiverInfo] = {
+ receiverInfos.get(receiverId)
+ }
+
+ def lastCompletedBatch: Option[BatchInfo] = {
+ completedaBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption
+ }
+
+ def lastReceivedBatch: Option[BatchInfo] = {
+ retainedBatches.lastOption
+ }
+
+ private def retainedBatches: Seq[BatchInfo] = synchronized {
+ (waitingBatchInfos.values.toSeq ++
+ runningBatchInfos.values.toSeq ++ completedaBatchInfos).sortBy(_.batchTime)(Time.ordering)
+ }
+
+ private def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = {
+ Distribution(completedaBatchInfos.flatMap(getMetric(_)).map(_.toDouble))
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
new file mode 100644
index 0000000000..6607437db5
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.ui
+
+import java.util.Calendar
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.Logging
+import org.apache.spark.ui._
+import org.apache.spark.ui.UIUtils._
+import org.apache.spark.util.Distribution
+
+/** Page for Spark Web UI that shows statistics of a streaming job */
+private[ui] class StreamingPage(parent: StreamingTab)
+ extends WebUIPage("") with Logging {
+
+ private val listener = parent.listener
+ private val startTime = Calendar.getInstance().getTime()
+ private val emptyCell = "-"
+
+ /** Render the page */
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val content =
+ generateBasicStats() ++ <br></br> ++
+ <h4>Statistics over last {listener.retainedCompletedBatches.size} processed batches</h4> ++
+ generateNetworkStatsTable() ++
+ generateBatchStatsTable()
+ UIUtils.headerSparkPage(
+ content, parent.basePath, parent.appName, "Streaming", parent.headerTabs, parent, Some(5000))
+ }
+
+ /** Generate basic stats of the streaming program */
+ private def generateBasicStats(): Seq[Node] = {
+ val timeSinceStart = System.currentTimeMillis() - startTime.getTime
+ <ul class ="unstyled">
+ <li>
+ <strong>Started at: </strong> {startTime.toString}
+ </li>
+ <li>
+ <strong>Time since start: </strong>{formatDurationVerbose(timeSinceStart)}
+ </li>
+ <li>
+ <strong>Network receivers: </strong>{listener.numNetworkReceivers}
+ </li>
+ <li>
+ <strong>Batch interval: </strong>{formatDurationVerbose(listener.batchDuration)}
+ </li>
+ <li>
+ <strong>Processed batches: </strong>{listener.numTotalCompletedBatches}
+ </li>
+ <li>
+ <strong>Waiting batches: </strong>{listener.numUnprocessedBatches}
+ </li>
+ </ul>
+ }
+
+ /** Generate stats of data received over the network the streaming program */
+ private def generateNetworkStatsTable(): Seq[Node] = {
+ val receivedRecordDistributions = listener.receivedRecordsDistributions
+ val lastBatchReceivedRecord = listener.lastReceivedBatchRecords
+ val table = if (receivedRecordDistributions.size > 0) {
+ val headerRow = Seq(
+ "Receiver",
+ "Location",
+ "Records in last batch\n[" + formatDate(Calendar.getInstance().getTime()) + "]",
+ "Minimum rate\n[records/sec]",
+ "25th percentile rate\n[records/sec]",
+ "Median rate\n[records/sec]",
+ "75th percentile rate\n[records/sec]",
+ "Maximum rate\n[records/sec]"
+ )
+ val dataRows = (0 until listener.numNetworkReceivers).map { receiverId =>
+ val receiverInfo = listener.receiverInfo(receiverId)
+ val receiverName = receiverInfo.map(_.toString).getOrElse(s"Receiver-$receiverId")
+ val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCell)
+ val receiverLastBatchRecords = formatDurationVerbose(lastBatchReceivedRecord(receiverId))
+ val receivedRecordStats = receivedRecordDistributions(receiverId).map { d =>
+ d.getQuantiles().map(r => formatDurationVerbose(r.toLong))
+ }.getOrElse {
+ Seq(emptyCell, emptyCell, emptyCell, emptyCell, emptyCell)
+ }
+ Seq(receiverName, receiverLocation, receiverLastBatchRecords) ++ receivedRecordStats
+ }
+ Some(listingTable(headerRow, dataRows))
+ } else {
+ None
+ }
+
+ val content =
+ <h5>Network Input Statistics</h5> ++
+ <div>{table.getOrElse("No network receivers")}</div>
+
+ content
+ }
+
+ /** Generate stats of batch jobs of the streaming program */
+ private def generateBatchStatsTable(): Seq[Node] = {
+ val numBatches = listener.retainedCompletedBatches.size
+ val lastCompletedBatch = listener.lastCompletedBatch
+ val table = if (numBatches > 0) {
+ val processingDelayQuantilesRow = {
+ Seq(
+ "Processing Time",
+ formatDurationOption(lastCompletedBatch.flatMap(_.processingDelay))
+ ) ++ getQuantiles(listener.processingDelayDistribution)
+ }
+ val schedulingDelayQuantilesRow = {
+ Seq(
+ "Scheduling Delay",
+ formatDurationOption(lastCompletedBatch.flatMap(_.schedulingDelay))
+ ) ++ getQuantiles(listener.schedulingDelayDistribution)
+ }
+ val totalDelayQuantilesRow = {
+ Seq(
+ "Total Delay",
+ formatDurationOption(lastCompletedBatch.flatMap(_.totalDelay))
+ ) ++ getQuantiles(listener.totalDelayDistribution)
+ }
+ val headerRow = Seq("Metric", "Last batch", "Minimum", "25th percentile",
+ "Median", "75th percentile", "Maximum")
+ val dataRows: Seq[Seq[String]] = Seq(
+ processingDelayQuantilesRow,
+ schedulingDelayQuantilesRow,
+ totalDelayQuantilesRow
+ )
+ Some(listingTable(headerRow, dataRows))
+ } else {
+ None
+ }
+
+ val content =
+ <h5>Batch Processing Statistics</h5> ++
+ <div>
+ <ul class="unstyled">
+ {table.getOrElse("No statistics have been generated yet.")}
+ </ul>
+ </div>
+
+ content
+ }
+
+
+ /**
+ * Returns a human-readable string representing a duration such as "5 second 35 ms"
+ */
+ private def formatDurationOption(msOption: Option[Long]): String = {
+ msOption.map(formatDurationVerbose).getOrElse(emptyCell)
+ }
+
+ /** Get quantiles for any time distribution */
+ private def getQuantiles(timeDistributionOption: Option[Distribution]) = {
+ timeDistributionOption.get.getQuantiles().map { ms => formatDurationVerbose(ms.toLong) }
+ }
+
+ /** Generate HTML table from string data */
+ private def listingTable(headers: Seq[String], data: Seq[Seq[String]]) = {
+ def generateDataRow(data: Seq[String]): Seq[Node] = {
+ <tr> {data.map(d => <td>{d}</td>)} </tr>
+ }
+ UIUtils.listingTable(headers, generateDataRow, data, fixedWidth = true)
+ }
+}
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
new file mode 100644
index 0000000000..51448d15c6
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.ui
+
+import org.apache.spark.Logging
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.ui.WebUITab
+
+/** Spark Web UI tab that shows statistics of a streaming job */
+private[spark] class StreamingTab(ssc: StreamingContext)
+ extends WebUITab(ssc.sc.ui, "streaming") with Logging {
+
+ val parent = ssc.sc.ui
+ val appName = parent.appName
+ val basePath = parent.basePath
+ val listener = new StreamingJobProgressListener(ssc)
+
+ ssc.addStreamingListener(listener)
+ attachPage(new StreamingPage(this))
+ parent.attachTab(this)
+}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 389b23d4d5..952511d411 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -239,11 +239,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
/** This is a server to test the network input stream */
-class TestServer() extends Logging {
+class TestServer(portToBind: Int = 0) extends Logging {
val queue = new ArrayBlockingQueue[String](100)
- val serverSocket = new ServerSocket(0)
+ val serverSocket = new ServerSocket(portToBind)
val servingThread = new Thread() {
override def run() {
@@ -282,7 +282,7 @@ class TestServer() extends Logging {
def start() { servingThread.start() }
- def send(msg: String) { queue.add(msg) }
+ def send(msg: String) { queue.put(msg) }
def stop() { servingThread.interrupt() }
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 9cc27ef7f0..efd0d22ecb 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -161,7 +161,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
}
}
-
test("stop only streaming context") {
ssc = new StreamingContext(master, appName, batchDuration)
sc = ssc.sparkContext
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
new file mode 100644
index 0000000000..35538ec188
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.io.Source
+
+import org.scalatest.FunSuite
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
+
+class UISuite extends FunSuite {
+
+ test("streaming tab in spark UI") {
+ val ssc = new StreamingContext("local", "test", Seconds(1))
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ val html = Source.fromURL(ssc.sparkContext.ui.appUIAddress).mkString
+ assert(!html.contains("random data that should not be present"))
+ // test if streaming tab exist
+ assert(html.toLowerCase.contains("streaming"))
+ // test if other Spark tabs still exist
+ assert(html.toLowerCase.contains("stages"))
+ }
+
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ val html = Source.fromURL(
+ ssc.sparkContext.ui.appUIAddress.stripSuffix("/") + "/streaming").mkString
+ assert(html.toLowerCase.contains("batch"))
+ assert(html.toLowerCase.contains("network"))
+ }
+ }
+}