diff options
Diffstat (limited to 'streaming/src')
16 files changed, 576 insertions, 88 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index a4e236c65f..ff5d0aaa3d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -17,29 +17,28 @@ package org.apache.spark.streaming -import scala.collection.mutable.Queue -import scala.collection.Map -import scala.reflect.ClassTag - import java.io.InputStream import java.util.concurrent.atomic.AtomicInteger -import akka.actor.Props -import akka.actor.SupervisorStrategy -import org.apache.hadoop.io.LongWritable -import org.apache.hadoop.io.Text +import scala.collection.Map +import scala.collection.mutable.Queue +import scala.reflect.ClassTag + +import akka.actor.{Props, SupervisorStrategy} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat -import org.apache.hadoop.fs.Path import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.MetadataCleaner import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receivers._ import org.apache.spark.streaming.scheduler._ -import org.apache.hadoop.conf.Configuration +import org.apache.spark.streaming.ui.StreamingTab +import org.apache.spark.util.MetadataCleaner /** * Main entry point for Spark Streaming functionality. It provides methods used to create @@ -158,6 +157,8 @@ class StreamingContext private[streaming] ( private[streaming] val waiter = new ContextWaiter + private[streaming] val uiTab = new StreamingTab(this) + /** Enumeration to identify current state of the StreamingContext */ private[streaming] object StreamingContextState extends Enumeration { type CheckpointState = Value diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index d043200f71..a7e5215437 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -353,15 +353,6 @@ abstract class DStream[T: ClassTag] ( dependencies.foreach(_.clearMetadata(time)) } - /* Adds metadata to the Stream while it is running. - * This method should be overwritten by sublcasses of InputDStream. - */ - private[streaming] def addMetadata(metadata: Any) { - if (metadata != null) { - logInfo("Dropping Metadata: " + metadata.toString) - } - } - /** * Refresh the list of checkpointed RDDs that will be saved along with checkpoint of * this stream. This is an internal method that should not be called directly. This is diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala index d19a635fe8..5a249706b4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala @@ -17,24 +17,23 @@ package org.apache.spark.streaming.dstream -import java.util.concurrent.{TimeUnit, ArrayBlockingQueue} import java.nio.ByteBuffer +import java.util.concurrent.{ArrayBlockingQueue, TimeUnit} -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.concurrent.Await -import scala.concurrent.duration._ import scala.reflect.ClassTag -import akka.actor.{Props, Actor} +import akka.actor.{Actor, Props} import akka.pattern.ask -import org.apache.spark.streaming.util.{RecurringTimer, SystemClock} -import org.apache.spark.streaming._ import org.apache.spark.{Logging, SparkEnv} -import org.apache.spark.rdd.{RDD, BlockRDD} +import org.apache.spark.rdd.{BlockRDD, RDD} import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId} -import org.apache.spark.streaming.scheduler.{DeregisterReceiver, AddBlocks, RegisterReceiver} -import org.apache.spark.util.AkkaUtils +import org.apache.spark.streaming._ +import org.apache.spark.streaming.scheduler.{AddBlock, DeregisterReceiver, ReceivedBlockInfo, RegisterReceiver} +import org.apache.spark.streaming.util.{RecurringTimer, SystemClock} +import org.apache.spark.util.{AkkaUtils, Utils} /** * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]] @@ -49,8 +48,10 @@ import org.apache.spark.util.AkkaUtils abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingContext) extends InputDStream[T](ssc_) { - // This is an unique identifier that is used to match the network receiver with the - // corresponding network input stream. + /** Keeps all received blocks information */ + private lazy val receivedBlockInfo = new HashMap[Time, Array[ReceivedBlockInfo]] + + /** This is an unique identifier for the network input stream. */ val id = ssc.getNewNetworkStreamId() /** @@ -65,25 +66,44 @@ abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingConte def stop() {} + /** Ask NetworkInputTracker for received data blocks and generates RDDs with them. */ override def compute(validTime: Time): Option[RDD[T]] = { // If this is called for any time before the start time of the context, // then this returns an empty RDD. This may happen when recovering from a // master failure if (validTime >= graph.startTime) { - val blockIds = ssc.scheduler.networkInputTracker.getBlocks(id, validTime) + val blockInfo = ssc.scheduler.networkInputTracker.getReceivedBlockInfo(id) + receivedBlockInfo(validTime) = blockInfo + val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId]) Some(new BlockRDD[T](ssc.sc, blockIds)) } else { Some(new BlockRDD[T](ssc.sc, Array[BlockId]())) } } + + /** Get information on received blocks. */ + private[streaming] def getReceivedBlockInfo(time: Time) = { + receivedBlockInfo(time) + } + + /** + * Clear metadata that are older than `rememberDuration` of this DStream. + * This is an internal method that should not be called directly. This + * implementation overrides the default implementation to clear received + * block information. + */ + private[streaming] override def clearMetadata(time: Time) { + super.clearMetadata(time) + val oldReceivedBlocks = receivedBlockInfo.filter(_._1 <= (time - rememberDuration)) + receivedBlockInfo --= oldReceivedBlocks.keys + logDebug("Cleared " + oldReceivedBlocks.size + " RDDs that were older than " + + (time - rememberDuration) + ": " + oldReceivedBlocks.keys.mkString(", ")) + } } private[streaming] sealed trait NetworkReceiverMessage -private[streaming] case class StopReceiver() extends NetworkReceiverMessage -private[streaming] case class ReportBlock(blockId: BlockId, metadata: Any) - extends NetworkReceiverMessage -private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage +private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage /** * Abstract class of a receiver that can be run on worker nodes to receive external data. See @@ -177,6 +197,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging case (e, i) => "Exception " + i + ": " + e.getMessage + "\n" + e.getStackTraceString }.mkString("\n") } + logInfo("Deregistering receiver " + streamId) val future = trackerActor.ask(DeregisterReceiver(streamId, message))(askTimeout) Await.result(future, askTimeout) @@ -209,18 +230,28 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging /** * Push a block (as an ArrayBuffer filled with data) into the block manager. */ - def pushBlock(blockId: BlockId, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) { + def pushBlock( + blockId: StreamBlockId, + arrayBuffer: ArrayBuffer[T], + metadata: Any, + level: StorageLevel + ) { env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level) - trackerActor ! AddBlocks(streamId, Array(blockId), metadata) + trackerActor ! AddBlock(ReceivedBlockInfo(streamId, blockId, arrayBuffer.size, metadata)) logDebug("Pushed block " + blockId) } /** * Push a block (as bytes) into the block manager. */ - def pushBlock(blockId: BlockId, bytes: ByteBuffer, metadata: Any, level: StorageLevel) { + def pushBlock( + blockId: StreamBlockId, + bytes: ByteBuffer, + metadata: Any, + level: StorageLevel + ) { env.blockManager.putBytes(blockId, bytes, level) - trackerActor ! AddBlocks(streamId, Array(blockId), metadata) + trackerActor ! AddBlock(ReceivedBlockInfo(streamId, blockId, -1, metadata)) } /** Set the ID of the DStream that this receiver is associated with */ @@ -232,9 +263,11 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging private class NetworkReceiverActor extends Actor { override def preStart() { - logInfo("Registered receiver " + streamId) - val future = trackerActor.ask(RegisterReceiver(streamId, self))(askTimeout) + val msg = RegisterReceiver( + streamId, NetworkReceiver.this.getClass.getSimpleName, Utils.localHostName(), self) + val future = trackerActor.ask(msg)(askTimeout) Await.result(future, askTimeout) + logInfo("Registered receiver " + streamId) } override def receive() = { @@ -253,7 +286,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging class BlockGenerator(storageLevel: StorageLevel) extends Serializable with Logging { - case class Block(id: BlockId, buffer: ArrayBuffer[T], metadata: Any = null) + case class Block(id: StreamBlockId, buffer: ArrayBuffer[T], metadata: Any = null) val clock = new SystemClock() val blockInterval = env.conf.getLong("spark.streaming.blockInterval", 200) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala index 7f3cd2f8eb..9c69a2a4e2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala @@ -29,6 +29,7 @@ import org.apache.spark.streaming.Time */ case class BatchInfo( batchTime: Time, + receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]], submissionTime: Long, processingStartTime: Option[Long], processingEndTime: Option[Long] diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 92d885c4bc..e564eccba2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -201,7 +201,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " + timesToReschedule.mkString(", ")) timesToReschedule.foreach(time => - jobScheduler.runJobs(time, graph.generateJobs(time)) + jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time))) ) // Restart the timer @@ -214,7 +214,12 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { SparkEnv.set(ssc.env) Try(graph.generateJobs(time)) match { case Success(jobs) => - jobScheduler.runJobs(time, jobs) + val receivedBlockInfo = graph.getNetworkInputStreams.map { stream => + val streamId = stream.id + val receivedBlockInfo = stream.getReceivedBlockInfo(time) + (streamId, receivedBlockInfo) + }.toMap + jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 04e0a6a283..d9ada99b47 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -100,14 +100,13 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { logInfo("Stopped JobScheduler") } - def runJobs(time: Time, jobs: Seq[Job]) { - if (jobs.isEmpty) { - logInfo("No jobs added for time " + time) + def submitJobSet(jobSet: JobSet) { + if (jobSet.jobs.isEmpty) { + logInfo("No jobs added for time " + jobSet.time) } else { - val jobSet = new JobSet(time, jobs) - jobSets.put(time, jobSet) + jobSets.put(jobSet.time, jobSet) jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) - logInfo("Added jobs for time " + time) + logInfo("Added jobs for time " + jobSet.time) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala index fcf303aee6..a69d743621 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala @@ -24,7 +24,11 @@ import org.apache.spark.streaming.Time * belong to the same batch. */ private[streaming] -case class JobSet(time: Time, jobs: Seq[Job]) { +case class JobSet( + time: Time, + jobs: Seq[Job], + receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]] = Map.empty + ) { private val incompleteJobs = new HashSet[Job]() private val submissionTime = System.currentTimeMillis() // when this jobset was submitted @@ -60,6 +64,7 @@ case class JobSet(time: Time, jobs: Seq[Job]) { def toBatchInfo: BatchInfo = { new BatchInfo( time, + receivedBlockInfo, submissionTime, if (processingStartTime >= 0 ) Some(processingStartTime) else None, if (processingEndTime >= 0 ) Some(processingEndTime) else None diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala index 067e804202..a1e6f51768 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala @@ -17,20 +17,42 @@ package org.apache.spark.streaming.scheduler -import scala.collection.mutable.{HashMap, Queue, SynchronizedMap} +import scala.collection.mutable.{HashMap, SynchronizedMap, SynchronizedQueue} import akka.actor._ + import org.apache.spark.{Logging, SparkEnv, SparkException} import org.apache.spark.SparkContext._ -import org.apache.spark.storage.BlockId +import org.apache.spark.storage.StreamBlockId import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.streaming.dstream.{NetworkReceiver, StopReceiver} import org.apache.spark.util.AkkaUtils +/** Information about receiver */ +case class ReceiverInfo(streamId: Int, typ: String, location: String) { + override def toString = s"$typ-$streamId" +} + +/** Information about blocks received by the network receiver */ +case class ReceivedBlockInfo( + streamId: Int, + blockId: StreamBlockId, + numRecords: Long, + metadata: Any + ) + +/** + * Messages used by the NetworkReceiver and the NetworkInputTracker to communicate + * with each other. + */ private[streaming] sealed trait NetworkInputTrackerMessage -private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) - extends NetworkInputTrackerMessage -private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any) +private[streaming] case class RegisterReceiver( + streamId: Int, + typ: String, + host: String, + receiverActor: ActorRef + ) extends NetworkInputTrackerMessage +private[streaming] case class AddBlock(receivedBlockInfo: ReceivedBlockInfo) extends NetworkInputTrackerMessage private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage @@ -47,9 +69,10 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging { val networkInputStreamMap = Map(networkInputStreams.map(x => (x.id, x)): _*) val receiverExecutor = new ReceiverExecutor() val receiverInfo = new HashMap[Int, ActorRef] with SynchronizedMap[Int, ActorRef] - val receivedBlockIds = new HashMap[Int, Queue[BlockId]] with SynchronizedMap[Int, Queue[BlockId]] + val receivedBlockInfo = new HashMap[Int, SynchronizedQueue[ReceivedBlockInfo]] + with SynchronizedMap[Int, SynchronizedQueue[ReceivedBlockInfo]] val timeout = AkkaUtils.askTimeout(ssc.conf) - + val listenerBus = ssc.scheduler.listenerBus // actor is created when generator starts. // This not being null means the tracker has been started and not stopped @@ -83,12 +106,32 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging { } } + /** Return all the blocks received from a receiver. */ + def getReceivedBlockInfo(streamId: Int): Array[ReceivedBlockInfo] = { + val receivedBlockInfo = getReceivedBlockInfoQueue(streamId).dequeueAll(x => true) + logInfo("Stream " + streamId + " received " + receivedBlockInfo.size + " blocks") + receivedBlockInfo.toArray + } + + private def getReceivedBlockInfoQueue(streamId: Int) = { + receivedBlockInfo.getOrElseUpdate(streamId, new SynchronizedQueue[ReceivedBlockInfo]) + } + /** Register a receiver */ - def registerReceiver(streamId: Int, receiverActor: ActorRef, sender: ActorRef) { + def registerReceiver( + streamId: Int, + typ: String, + host: String, + receiverActor: ActorRef, + sender: ActorRef + ) { if (!networkInputStreamMap.contains(streamId)) { throw new Exception("Register received for unexpected id " + streamId) } receiverInfo += ((streamId, receiverActor)) + ssc.scheduler.listenerBus.post(StreamingListenerReceiverStarted( + ReceiverInfo(streamId, typ, host) + )) logInfo("Registered receiver for network stream " + streamId + " from " + sender.path.address) } @@ -98,35 +141,26 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging { logError("Deregistered receiver for network stream " + streamId + " with message:\n" + message) } - /** Get all the received blocks for the given stream. */ - def getBlocks(streamId: Int, time: Time): Array[BlockId] = { - val queue = receivedBlockIds.getOrElseUpdate(streamId, new Queue[BlockId]()) - val result = queue.dequeueAll(x => true).toArray - logInfo("Stream " + streamId + " received " + result.size + " blocks") - result - } - /** Add new blocks for the given stream */ - def addBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any) = { - val queue = receivedBlockIds.getOrElseUpdate(streamId, new Queue[BlockId]) - queue ++= blockIds - networkInputStreamMap(streamId).addMetadata(metadata) - logDebug("Stream " + streamId + " received new blocks: " + blockIds.mkString("[", ", ", "]")) + def addBlocks(receivedBlockInfo: ReceivedBlockInfo) { + getReceivedBlockInfoQueue(receivedBlockInfo.streamId) += receivedBlockInfo + logDebug("Stream " + receivedBlockInfo.streamId + " received new blocks: " + + receivedBlockInfo.blockId) } /** Check if any blocks are left to be processed */ def hasMoreReceivedBlockIds: Boolean = { - !receivedBlockIds.forall(_._2.isEmpty) + !receivedBlockInfo.values.forall(_.isEmpty) } /** Actor to receive messages from the receivers. */ private class NetworkInputTrackerActor extends Actor { def receive = { - case RegisterReceiver(streamId, receiverActor) => - registerReceiver(streamId, receiverActor, sender) + case RegisterReceiver(streamId, typ, host, receiverActor) => + registerReceiver(streamId, typ, host, receiverActor, sender) sender ! true - case AddBlocks(streamId, blockIds, metadata) => - addBlocks(streamId, blockIds, metadata) + case AddBlock(receivedBlockInfo) => + addBlocks(receivedBlockInfo) case DeregisterReceiver(streamId, message) => deregisterReceiver(streamId, message) sender ! true diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala index 461ea35064..5db40ebbeb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala @@ -23,8 +23,11 @@ import org.apache.spark.util.Distribution /** Base trait for events related to StreamingListener */ sealed trait StreamingListenerEvent +case class StreamingListenerBatchSubmitted(batchInfo: BatchInfo) extends StreamingListenerEvent case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent +case class StreamingListenerReceiverStarted(receiverInfo: ReceiverInfo) + extends StreamingListenerEvent /** An event used in the listener to shutdown the listener daemon thread. */ private[scheduler] case object StreamingListenerShutdown extends StreamingListenerEvent @@ -34,14 +37,17 @@ private[scheduler] case object StreamingListenerShutdown extends StreamingListen * computation. */ trait StreamingListener { - /** - * Called when processing of a batch has completed - */ + + /** Called when a receiver has been started */ + def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { } + + /** Called when a batch of jobs has been submitted for processing. */ + def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { } + + /** Called when processing of a batch of jobs has completed. */ def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { } - /** - * Called when processing of a batch has started - */ + /** Called when processing of a batch of jobs has started. */ def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala index 18811fc2b0..ea03dfc7bf 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala @@ -38,6 +38,10 @@ private[spark] class StreamingListenerBus() extends Logging { while (true) { val event = eventQueue.take event match { + case receiverStarted: StreamingListenerReceiverStarted => + listeners.foreach(_.onReceiverStarted(receiverStarted)) + case batchSubmitted: StreamingListenerBatchSubmitted => + listeners.foreach(_.onBatchSubmitted(batchSubmitted)) case batchStarted: StreamingListenerBatchStarted => listeners.foreach(_.onBatchStarted(batchStarted)) case batchCompleted: StreamingListenerBatchCompleted => diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala new file mode 100644 index 0000000000..8b025b09ed --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.ui + +import org.apache.spark.streaming.{Time, StreamingContext} +import org.apache.spark.streaming.scheduler._ +import scala.collection.mutable.{Queue, HashMap} +import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted +import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted +import org.apache.spark.streaming.scheduler.BatchInfo +import org.apache.spark.streaming.scheduler.ReceiverInfo +import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted +import org.apache.spark.util.Distribution + + +private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends StreamingListener { + + private val waitingBatchInfos = new HashMap[Time, BatchInfo] + private val runningBatchInfos = new HashMap[Time, BatchInfo] + private val completedaBatchInfos = new Queue[BatchInfo] + private val batchInfoLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100) + private var totalCompletedBatches = 0L + private val receiverInfos = new HashMap[Int, ReceiverInfo] + + val batchDuration = ssc.graph.batchDuration.milliseconds + + override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) = { + synchronized { + receiverInfos.put(receiverStarted.receiverInfo.streamId, receiverStarted.receiverInfo) + } + } + + override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) = synchronized { + runningBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo + } + + override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) = synchronized { + runningBatchInfos(batchStarted.batchInfo.batchTime) = batchStarted.batchInfo + waitingBatchInfos.remove(batchStarted.batchInfo.batchTime) + } + + override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized { + waitingBatchInfos.remove(batchCompleted.batchInfo.batchTime) + runningBatchInfos.remove(batchCompleted.batchInfo.batchTime) + completedaBatchInfos.enqueue(batchCompleted.batchInfo) + if (completedaBatchInfos.size > batchInfoLimit) completedaBatchInfos.dequeue() + totalCompletedBatches += 1L + } + + def numNetworkReceivers = synchronized { + ssc.graph.getNetworkInputStreams().size + } + + def numTotalCompletedBatches: Long = synchronized { + totalCompletedBatches + } + + def numUnprocessedBatches: Long = synchronized { + waitingBatchInfos.size + runningBatchInfos.size + } + + def waitingBatches: Seq[BatchInfo] = synchronized { + waitingBatchInfos.values.toSeq + } + + def runningBatches: Seq[BatchInfo] = synchronized { + runningBatchInfos.values.toSeq + } + + def retainedCompletedBatches: Seq[BatchInfo] = synchronized { + completedaBatchInfos.toSeq + } + + def processingDelayDistribution: Option[Distribution] = synchronized { + extractDistribution(_.processingDelay) + } + + def schedulingDelayDistribution: Option[Distribution] = synchronized { + extractDistribution(_.schedulingDelay) + } + + def totalDelayDistribution: Option[Distribution] = synchronized { + extractDistribution(_.totalDelay) + } + + def receivedRecordsDistributions: Map[Int, Option[Distribution]] = synchronized { + val latestBatchInfos = retainedBatches.reverse.take(batchInfoLimit) + val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo) + (0 until numNetworkReceivers).map { receiverId => + val blockInfoOfParticularReceiver = latestBlockInfos.map { batchInfo => + batchInfo.get(receiverId).getOrElse(Array.empty) + } + val recordsOfParticularReceiver = blockInfoOfParticularReceiver.map { blockInfo => + // calculate records per second for each batch + blockInfo.map(_.numRecords).sum.toDouble * 1000 / batchDuration + } + val distributionOption = Distribution(recordsOfParticularReceiver) + (receiverId, distributionOption) + }.toMap + } + + def lastReceivedBatchRecords: Map[Int, Long] = { + val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receivedBlockInfo) + lastReceivedBlockInfoOption.map { lastReceivedBlockInfo => + (0 until numNetworkReceivers).map { receiverId => + (receiverId, lastReceivedBlockInfo(receiverId).map(_.numRecords).sum) + }.toMap + }.getOrElse { + (0 until numNetworkReceivers).map(receiverId => (receiverId, 0L)).toMap + } + } + + def receiverInfo(receiverId: Int): Option[ReceiverInfo] = { + receiverInfos.get(receiverId) + } + + def lastCompletedBatch: Option[BatchInfo] = { + completedaBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption + } + + def lastReceivedBatch: Option[BatchInfo] = { + retainedBatches.lastOption + } + + private def retainedBatches: Seq[BatchInfo] = synchronized { + (waitingBatchInfos.values.toSeq ++ + runningBatchInfos.values.toSeq ++ completedaBatchInfos).sortBy(_.batchTime)(Time.ordering) + } + + private def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = { + Distribution(completedaBatchInfos.flatMap(getMetric(_)).map(_.toDouble)) + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala new file mode 100644 index 0000000000..6607437db5 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.ui + +import java.util.Calendar +import javax.servlet.http.HttpServletRequest + +import scala.xml.Node + +import org.apache.spark.Logging +import org.apache.spark.ui._ +import org.apache.spark.ui.UIUtils._ +import org.apache.spark.util.Distribution + +/** Page for Spark Web UI that shows statistics of a streaming job */ +private[ui] class StreamingPage(parent: StreamingTab) + extends WebUIPage("") with Logging { + + private val listener = parent.listener + private val startTime = Calendar.getInstance().getTime() + private val emptyCell = "-" + + /** Render the page */ + def render(request: HttpServletRequest): Seq[Node] = { + val content = + generateBasicStats() ++ <br></br> ++ + <h4>Statistics over last {listener.retainedCompletedBatches.size} processed batches</h4> ++ + generateNetworkStatsTable() ++ + generateBatchStatsTable() + UIUtils.headerSparkPage( + content, parent.basePath, parent.appName, "Streaming", parent.headerTabs, parent, Some(5000)) + } + + /** Generate basic stats of the streaming program */ + private def generateBasicStats(): Seq[Node] = { + val timeSinceStart = System.currentTimeMillis() - startTime.getTime + <ul class ="unstyled"> + <li> + <strong>Started at: </strong> {startTime.toString} + </li> + <li> + <strong>Time since start: </strong>{formatDurationVerbose(timeSinceStart)} + </li> + <li> + <strong>Network receivers: </strong>{listener.numNetworkReceivers} + </li> + <li> + <strong>Batch interval: </strong>{formatDurationVerbose(listener.batchDuration)} + </li> + <li> + <strong>Processed batches: </strong>{listener.numTotalCompletedBatches} + </li> + <li> + <strong>Waiting batches: </strong>{listener.numUnprocessedBatches} + </li> + </ul> + } + + /** Generate stats of data received over the network the streaming program */ + private def generateNetworkStatsTable(): Seq[Node] = { + val receivedRecordDistributions = listener.receivedRecordsDistributions + val lastBatchReceivedRecord = listener.lastReceivedBatchRecords + val table = if (receivedRecordDistributions.size > 0) { + val headerRow = Seq( + "Receiver", + "Location", + "Records in last batch\n[" + formatDate(Calendar.getInstance().getTime()) + "]", + "Minimum rate\n[records/sec]", + "25th percentile rate\n[records/sec]", + "Median rate\n[records/sec]", + "75th percentile rate\n[records/sec]", + "Maximum rate\n[records/sec]" + ) + val dataRows = (0 until listener.numNetworkReceivers).map { receiverId => + val receiverInfo = listener.receiverInfo(receiverId) + val receiverName = receiverInfo.map(_.toString).getOrElse(s"Receiver-$receiverId") + val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCell) + val receiverLastBatchRecords = formatDurationVerbose(lastBatchReceivedRecord(receiverId)) + val receivedRecordStats = receivedRecordDistributions(receiverId).map { d => + d.getQuantiles().map(r => formatDurationVerbose(r.toLong)) + }.getOrElse { + Seq(emptyCell, emptyCell, emptyCell, emptyCell, emptyCell) + } + Seq(receiverName, receiverLocation, receiverLastBatchRecords) ++ receivedRecordStats + } + Some(listingTable(headerRow, dataRows)) + } else { + None + } + + val content = + <h5>Network Input Statistics</h5> ++ + <div>{table.getOrElse("No network receivers")}</div> + + content + } + + /** Generate stats of batch jobs of the streaming program */ + private def generateBatchStatsTable(): Seq[Node] = { + val numBatches = listener.retainedCompletedBatches.size + val lastCompletedBatch = listener.lastCompletedBatch + val table = if (numBatches > 0) { + val processingDelayQuantilesRow = { + Seq( + "Processing Time", + formatDurationOption(lastCompletedBatch.flatMap(_.processingDelay)) + ) ++ getQuantiles(listener.processingDelayDistribution) + } + val schedulingDelayQuantilesRow = { + Seq( + "Scheduling Delay", + formatDurationOption(lastCompletedBatch.flatMap(_.schedulingDelay)) + ) ++ getQuantiles(listener.schedulingDelayDistribution) + } + val totalDelayQuantilesRow = { + Seq( + "Total Delay", + formatDurationOption(lastCompletedBatch.flatMap(_.totalDelay)) + ) ++ getQuantiles(listener.totalDelayDistribution) + } + val headerRow = Seq("Metric", "Last batch", "Minimum", "25th percentile", + "Median", "75th percentile", "Maximum") + val dataRows: Seq[Seq[String]] = Seq( + processingDelayQuantilesRow, + schedulingDelayQuantilesRow, + totalDelayQuantilesRow + ) + Some(listingTable(headerRow, dataRows)) + } else { + None + } + + val content = + <h5>Batch Processing Statistics</h5> ++ + <div> + <ul class="unstyled"> + {table.getOrElse("No statistics have been generated yet.")} + </ul> + </div> + + content + } + + + /** + * Returns a human-readable string representing a duration such as "5 second 35 ms" + */ + private def formatDurationOption(msOption: Option[Long]): String = { + msOption.map(formatDurationVerbose).getOrElse(emptyCell) + } + + /** Get quantiles for any time distribution */ + private def getQuantiles(timeDistributionOption: Option[Distribution]) = { + timeDistributionOption.get.getQuantiles().map { ms => formatDurationVerbose(ms.toLong) } + } + + /** Generate HTML table from string data */ + private def listingTable(headers: Seq[String], data: Seq[Seq[String]]) = { + def generateDataRow(data: Seq[String]): Seq[Node] = { + <tr> {data.map(d => <td>{d}</td>)} </tr> + } + UIUtils.listingTable(headers, generateDataRow, data, fixedWidth = true) + } +} + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala new file mode 100644 index 0000000000..51448d15c6 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.ui + +import org.apache.spark.Logging +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.ui.WebUITab + +/** Spark Web UI tab that shows statistics of a streaming job */ +private[spark] class StreamingTab(ssc: StreamingContext) + extends WebUITab(ssc.sc.ui, "streaming") with Logging { + + val parent = ssc.sc.ui + val appName = parent.appName + val basePath = parent.basePath + val listener = new StreamingJobProgressListener(ssc) + + ssc.addStreamingListener(listener) + attachPage(new StreamingPage(this)) + parent.attachTab(this) +} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 389b23d4d5..952511d411 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -239,11 +239,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { /** This is a server to test the network input stream */ -class TestServer() extends Logging { +class TestServer(portToBind: Int = 0) extends Logging { val queue = new ArrayBlockingQueue[String](100) - val serverSocket = new ServerSocket(0) + val serverSocket = new ServerSocket(portToBind) val servingThread = new Thread() { override def run() { @@ -282,7 +282,7 @@ class TestServer() extends Logging { def start() { servingThread.start() } - def send(msg: String) { queue.add(msg) } + def send(msg: String) { queue.put(msg) } def stop() { servingThread.interrupt() } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 9cc27ef7f0..efd0d22ecb 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -161,7 +161,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w } } - test("stop only streaming context") { ssc = new StreamingContext(master, appName, batchDuration) sc = ssc.sparkContext diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala new file mode 100644 index 0000000000..35538ec188 --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import scala.io.Source + +import org.scalatest.FunSuite +import org.scalatest.concurrent.Eventually._ +import org.scalatest.time.SpanSugar._ + +class UISuite extends FunSuite { + + test("streaming tab in spark UI") { + val ssc = new StreamingContext("local", "test", Seconds(1)) + eventually(timeout(10 seconds), interval(50 milliseconds)) { + val html = Source.fromURL(ssc.sparkContext.ui.appUIAddress).mkString + assert(!html.contains("random data that should not be present")) + // test if streaming tab exist + assert(html.toLowerCase.contains("streaming")) + // test if other Spark tabs still exist + assert(html.toLowerCase.contains("stages")) + } + + eventually(timeout(10 seconds), interval(50 milliseconds)) { + val html = Source.fromURL( + ssc.sparkContext.ui.appUIAddress.stripSuffix("/") + "/streaming").mkString + assert(html.toLowerCase.contains("batch")) + assert(html.toLowerCase.contains("network")) + } + } +} |