From 1b2c2162af4d5d2d950af94571e69273b49bf913 Mon Sep 17 00:00:00 2001 From: Jacek Laskowski Date: Thu, 7 Jan 2016 21:12:57 +0000 Subject: [STREAMING][MINOR] More contextual information in logs + minor code i… MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …mprovements Please review and merge at your convenience. Thanks! Author: Jacek Laskowski Closes #10595 from jaceklaskowski/streaming-minor-fixes. --- .../apache/spark/streaming/StreamingContext.scala | 12 +-- .../apache/spark/streaming/dstream/DStream.scala | 86 +++++++++++----------- .../spark/streaming/dstream/InputDStream.scala | 3 +- .../streaming/dstream/ReceiverInputDStream.scala | 4 +- .../streaming/receiver/ReceivedBlockHandler.scala | 2 +- .../apache/spark/streaming/receiver/Receiver.scala | 4 +- .../streaming/receiver/ReceiverSupervisor.scala | 8 +- .../apache/spark/streaming/scheduler/JobSet.scala | 8 +- 8 files changed, 61 insertions(+), 66 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index a5ab666975..ca0a21fbb7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -226,7 +226,7 @@ class StreamingContext private[streaming] ( * Set the context to periodically checkpoint the DStream operations for driver * fault-tolerance. * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored. - * Note that this must be a fault-tolerant file system like HDFS for + * Note that this must be a fault-tolerant file system like HDFS. */ def checkpoint(directory: String) { if (directory != null) { @@ -274,7 +274,7 @@ class StreamingContext private[streaming] ( * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html * @param receiver Custom implementation of Receiver * - * @deprecated As of 1.0.0", replaced by `receiverStream`. + * @deprecated As of 1.0.0 replaced by `receiverStream`. */ @deprecated("Use receiverStream", "1.0.0") def networkStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = { @@ -285,7 +285,7 @@ class StreamingContext private[streaming] ( /** * Create an input stream with any arbitrary user implemented receiver. - * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html + * Find more details at http://spark.apache.org/docs/latest/streaming-custom-receivers.html * @param receiver Custom implementation of Receiver */ def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = { @@ -549,7 +549,7 @@ class StreamingContext private[streaming] ( // Verify whether the DStream checkpoint is serializable if (isCheckpointingEnabled) { - val checkpoint = new Checkpoint(this, Time.apply(0)) + val checkpoint = new Checkpoint(this, Time(0)) try { Checkpoint.serialize(checkpoint, conf) } catch { @@ -575,9 +575,9 @@ class StreamingContext private[streaming] ( * * Return the current state of the context. The context can be in three possible states - * - * - StreamingContextState.INTIALIZED - The context has been created, but not been started yet. + * - StreamingContextState.INITIALIZED - The context has been created, but not started yet. * Input DStreams, transformations and output operations can be created on the context. - * - StreamingContextState.ACTIVE - The context has been started, and been not stopped. + * - StreamingContextState.ACTIVE - The context has been started, and not stopped. * Input DStreams, transformations and output operations cannot be created on the context. * - StreamingContextState.STOPPED - The context has been stopped and cannot be used any more. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index c59348a89d..1dfb4e7abc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -103,7 +103,7 @@ abstract class DStream[T: ClassTag] ( // Reference to whole DStream graph private[streaming] var graph: DStreamGraph = null - private[streaming] def isInitialized = (zeroTime != null) + private[streaming] def isInitialized = zeroTime != null // Duration for which the DStream requires its parent DStream to remember each RDD created private[streaming] def parentRememberDuration = rememberDuration @@ -189,15 +189,15 @@ abstract class DStream[T: ClassTag] ( */ private[streaming] def initialize(time: Time) { if (zeroTime != null && zeroTime != time) { - throw new SparkException("ZeroTime is already initialized to " + zeroTime - + ", cannot initialize it again to " + time) + throw new SparkException(s"ZeroTime is already initialized to $zeroTime" + + s", cannot initialize it again to $time") } zeroTime = time // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger if (mustCheckpoint && checkpointDuration == null) { checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt - logInfo("Checkpoint interval automatically set to " + checkpointDuration) + logInfo(s"Checkpoint interval automatically set to $checkpointDuration") } // Set the minimum value of the rememberDuration if not already set @@ -234,7 +234,7 @@ abstract class DStream[T: ClassTag] ( require( !mustCheckpoint || checkpointDuration != null, - "The checkpoint interval for " + this.getClass.getSimpleName + " has not been set." + + s"The checkpoint interval for ${this.getClass.getSimpleName} has not been set." + " Please use DStream.checkpoint() to set the interval." ) @@ -245,53 +245,53 @@ abstract class DStream[T: ClassTag] ( require( checkpointDuration == null || checkpointDuration >= slideDuration, - "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " + - checkpointDuration + " which is lower than its slide time (" + slideDuration + "). " + - "Please set it to at least " + slideDuration + "." + s"The checkpoint interval for ${this.getClass.getSimpleName} has been set to " + + s"$checkpointDuration which is lower than its slide time ($slideDuration). " + + s"Please set it to at least $slideDuration." ) require( checkpointDuration == null || checkpointDuration.isMultipleOf(slideDuration), - "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " + - checkpointDuration + " which not a multiple of its slide time (" + slideDuration + "). " + - "Please set it to a multiple of " + slideDuration + "." + s"The checkpoint interval for ${this.getClass.getSimpleName} has been set to " + + s" $checkpointDuration which not a multiple of its slide time ($slideDuration). " + + s"Please set it to a multiple of $slideDuration." ) require( checkpointDuration == null || storageLevel != StorageLevel.NONE, - "" + this.getClass.getSimpleName + " has been marked for checkpointing but the storage " + + s"${this.getClass.getSimpleName} has been marked for checkpointing but the storage " + "level has not been set to enable persisting. Please use DStream.persist() to set the " + "storage level to use memory for better checkpointing performance." ) require( checkpointDuration == null || rememberDuration > checkpointDuration, - "The remember duration for " + this.getClass.getSimpleName + " has been set to " + - rememberDuration + " which is not more than the checkpoint interval (" + - checkpointDuration + "). Please set it to higher than " + checkpointDuration + "." + s"The remember duration for ${this.getClass.getSimpleName} has been set to " + + s" $rememberDuration which is not more than the checkpoint interval" + + s" ($checkpointDuration). Please set it to higher than $checkpointDuration." ) dependencies.foreach(_.validateAtStart()) - logInfo("Slide time = " + slideDuration) - logInfo("Storage level = " + storageLevel) - logInfo("Checkpoint interval = " + checkpointDuration) - logInfo("Remember duration = " + rememberDuration) - logInfo("Initialized and validated " + this) + logInfo(s"Slide time = $slideDuration") + logInfo(s"Storage level = ${storageLevel.description}") + logInfo(s"Checkpoint interval = $checkpointDuration") + logInfo(s"Remember duration = $rememberDuration") + logInfo(s"Initialized and validated $this") } private[streaming] def setContext(s: StreamingContext) { if (ssc != null && ssc != s) { - throw new SparkException("Context is already set in " + this + ", cannot set it again") + throw new SparkException(s"Context must not be set again for $this") } ssc = s - logInfo("Set context for " + this) + logInfo(s"Set context for $this") dependencies.foreach(_.setContext(ssc)) } private[streaming] def setGraph(g: DStreamGraph) { if (graph != null && graph != g) { - throw new SparkException("Graph is already set in " + this + ", cannot set it again") + throw new SparkException(s"Graph must not be set again for $this") } graph = g dependencies.foreach(_.setGraph(graph)) @@ -300,7 +300,7 @@ abstract class DStream[T: ClassTag] ( private[streaming] def remember(duration: Duration) { if (duration != null && (rememberDuration == null || duration > rememberDuration)) { rememberDuration = duration - logInfo("Duration for remembering RDDs set to " + rememberDuration + " for " + this) + logInfo(s"Duration for remembering RDDs set to $rememberDuration for $this") } dependencies.foreach(_.remember(parentRememberDuration)) } @@ -310,11 +310,11 @@ abstract class DStream[T: ClassTag] ( if (!isInitialized) { throw new SparkException (this + " has not been initialized") } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) { - logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime + - " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime)) + logInfo(s"Time $time is invalid as zeroTime is $zeroTime" + + s" , slideDuration is $slideDuration and difference is ${time - zeroTime}") false } else { - logDebug("Time " + time + " is valid") + logDebug(s"Time $time is valid") true } } @@ -452,20 +452,20 @@ abstract class DStream[T: ClassTag] ( oldRDDs.map(x => s"${x._1} -> ${x._2.id}").mkString(", ") + "]") generatedRDDs --= oldRDDs.keys if (unpersistData) { - logDebug("Unpersisting old RDDs: " + oldRDDs.values.map(_.id).mkString(", ")) + logDebug(s"Unpersisting old RDDs: ${oldRDDs.values.map(_.id).mkString(", ")}") oldRDDs.values.foreach { rdd => rdd.unpersist(false) // Explicitly remove blocks of BlockRDD rdd match { case b: BlockRDD[_] => - logInfo("Removing blocks of RDD " + b + " of time " + time) + logInfo(s"Removing blocks of RDD $b of time $time") b.removeBlocks() case _ => } } } - logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " + - (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", ")) + logDebug(s"Cleared ${oldRDDs.size} RDDs that were older than " + + s"${time - rememberDuration}: ${oldRDDs.keys.mkString(", ")}") dependencies.foreach(_.clearMetadata(time)) } @@ -477,10 +477,10 @@ abstract class DStream[T: ClassTag] ( * this method to save custom checkpoint data. */ private[streaming] def updateCheckpointData(currentTime: Time) { - logDebug("Updating checkpoint data for time " + currentTime) + logDebug(s"Updating checkpoint data for time $currentTime") checkpointData.update(currentTime) dependencies.foreach(_.updateCheckpointData(currentTime)) - logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData) + logDebug(s"Updated checkpoint data for time $currentTime: $checkpointData") } private[streaming] def clearCheckpointData(time: Time) { @@ -509,13 +509,13 @@ abstract class DStream[T: ClassTag] ( @throws(classOf[IOException]) private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException { - logDebug(this.getClass().getSimpleName + ".writeObject used") + logDebug(s"${this.getClass().getSimpleName}.writeObject used") if (graph != null) { graph.synchronized { if (graph.checkpointInProgress) { oos.defaultWriteObject() } else { - val msg = "Object of " + this.getClass.getName + " is being serialized " + + val msg = s"Object of ${this.getClass.getName} is being serialized " + " possibly as a part of closure of an RDD operation. This is because " + " the DStream object is being referred to from within the closure. " + " Please rewrite the RDD operation inside this DStream to avoid this. " + @@ -532,7 +532,7 @@ abstract class DStream[T: ClassTag] ( @throws(classOf[IOException]) private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException { - logDebug(this.getClass().getSimpleName + ".readObject used") + logDebug(s"${this.getClass().getSimpleName}.readObject used") ois.defaultReadObject() generatedRDDs = new HashMap[Time, RDD[T]] () } @@ -756,7 +756,7 @@ abstract class DStream[T: ClassTag] ( val firstNum = rdd.take(num + 1) // scalastyle:off println println("-------------------------------------------") - println("Time: " + time) + println(s"Time: $time") println("-------------------------------------------") firstNum.take(num).foreach(println) if (firstNum.length > num) println("...") @@ -903,21 +903,19 @@ abstract class DStream[T: ClassTag] ( val alignedToTime = if ((toTime - zeroTime).isMultipleOf(slideDuration)) { toTime } else { - logWarning("toTime (" + toTime + ") is not a multiple of slideDuration (" - + slideDuration + ")") - toTime.floor(slideDuration, zeroTime) + logWarning(s"toTime ($toTime) is not a multiple of slideDuration ($slideDuration)") + toTime.floor(slideDuration, zeroTime) } val alignedFromTime = if ((fromTime - zeroTime).isMultipleOf(slideDuration)) { fromTime } else { - logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" - + slideDuration + ")") + logWarning(s"fromTime ($fromTime) is not a multiple of slideDuration ($slideDuration)") fromTime.floor(slideDuration, zeroTime) } - logInfo("Slicing from " + fromTime + " to " + toTime + - " (aligned to " + alignedFromTime + " and " + alignedToTime + ")") + logInfo(s"Slicing from $fromTime to $toTime" + + s" (aligned to $alignedFromTime and $alignedToTime)") alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => { if (time >= zeroTime) getOrCompute(time) else None diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala index 95994c983c..d60f418e5c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala @@ -28,7 +28,8 @@ import org.apache.spark.util.Utils /** * This is the abstract base class for all input streams. This class provides methods - * start() and stop() which is called by Spark Streaming system to start and stop receiving data. + * start() and stop() which are called by Spark Streaming system to start and stop + * receiving data, respectively. * Input streams that can generate RDDs from new data by running a service/thread only on * the driver node (that is, without running a receiver on worker nodes), can be * implemented by directly inheriting this InputDStream. For example, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index a18551fac7..565b137228 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -32,7 +32,7 @@ import org.apache.spark.streaming.util.WriteAheadLogUtils * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]] * that has to start a receiver on worker nodes to receive external data. * Specific implementations of ReceiverInputDStream must - * define `the getReceiver()` function that gets the receiver object of type + * define [[getReceiver]] function that gets the receiver object of type * [[org.apache.spark.streaming.receiver.Receiver]] that will be sent * to the workers to receive data. * @param ssc_ Streaming context that will execute this input stream @@ -121,7 +121,7 @@ abstract class ReceiverInputDStream[T: ClassTag](ssc_ : StreamingContext) } if (validBlockIds.size != blockIds.size) { logWarning("Some blocks could not be recovered as they were not found in memory. " + - "To prevent such data loss, enabled Write Ahead Log (see programming guide " + + "To prevent such data loss, enable Write Ahead Log (see programming guide " + "for more details.") } new BlockRDD[T](ssc.sc, validBlockIds) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index 43c605af73..faa5aca1d8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -69,7 +69,7 @@ private[streaming] class BlockManagerBasedBlockHandler( def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = { - var numRecords = None: Option[Long] + var numRecords: Option[Long] = None val putResult: Seq[(BlockId, BlockStatus)] = block match { case ArrayBufferBlock(arrayBuffer) => diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala index b08152485a..639f4259e2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala @@ -103,7 +103,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable /** * This method is called by the system when the receiver is stopped. All resources - * (threads, buffers, etc.) setup in `onStart()` must be cleaned up in this method. + * (threads, buffers, etc.) set up in `onStart()` must be cleaned up in this method. */ def onStop() @@ -273,7 +273,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable /** Get the attached supervisor. */ private[streaming] def supervisor: ReceiverSupervisor = { assert(_supervisor != null, - "A ReceiverSupervisor have not been attached to the receiver yet. Maybe you are starting " + + "A ReceiverSupervisor has not been attached to the receiver yet. Maybe you are starting " + "some computation in the receiver before the Receiver.onStart() has been called.") _supervisor } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala index c42a9ac233..d0195fb14f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala @@ -143,10 +143,10 @@ private[streaming] abstract class ReceiverSupervisor( def startReceiver(): Unit = synchronized { try { if (onReceiverStart()) { - logInfo("Starting receiver") + logInfo(s"Starting receiver $streamId") receiverState = Started receiver.onStart() - logInfo("Called receiver onStart") + logInfo(s"Called receiver $streamId onStart") } else { // The driver refused us stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None) @@ -218,11 +218,9 @@ private[streaming] abstract class ReceiverSupervisor( stopLatch.await() if (stoppingError != null) { logError("Stopped receiver with error: " + stoppingError) + throw stoppingError } else { logInfo("Stopped receiver without error") } - if (stoppingError != null) { - throw stoppingError - } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala index f76300351e..6e7232a2a0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala @@ -59,17 +59,15 @@ case class JobSet( // Time taken to process all the jobs from the time they were submitted // (i.e. including the time they wait in the streaming scheduler queue) - def totalDelay: Long = { - processingEndTime - time.milliseconds - } + def totalDelay: Long = processingEndTime - time.milliseconds def toBatchInfo: BatchInfo = { BatchInfo( time, streamIdToInputInfo, submissionTime, - if (processingStartTime >= 0) Some(processingStartTime) else None, - if (processingEndTime >= 0) Some(processingEndTime) else None, + if (hasStarted) Some(processingStartTime) else None, + if (hasCompleted) Some(processingEndTime) else None, jobs.map { job => (job.outputOpId, job.toOutputOperationInfo) }.toMap ) } -- cgit v1.2.3