aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorJacek Laskowski <jacek@japila.pl>2016-01-07 21:12:57 +0000
committerSean Owen <sowen@cloudera.com>2016-01-07 21:12:57 +0000
commit1b2c2162af4d5d2d950af94571e69273b49bf913 (patch)
tree38e73ce8a25605b4432b3cf2bd8e4b69a6e9897a /streaming
parent07b314a57a638a232ee0b5cd14169e57d742f0f9 (diff)
downloadspark-1b2c2162af4d5d2d950af94571e69273b49bf913.tar.gz
spark-1b2c2162af4d5d2d950af94571e69273b49bf913.tar.bz2
spark-1b2c2162af4d5d2d950af94571e69273b49bf913.zip
[STREAMING][MINOR] More contextual information in logs + minor code i…
…mprovements Please review and merge at your convenience. Thanks! Author: Jacek Laskowski <jacek@japila.pl> Closes #10595 from jaceklaskowski/streaming-minor-fixes.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala12
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala86
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala8
8 files changed, 61 insertions, 66 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index a5ab666975..ca0a21fbb7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -226,7 +226,7 @@ class StreamingContext private[streaming] (
* Set the context to periodically checkpoint the DStream operations for driver
* fault-tolerance.
* @param directory HDFS-compatible directory where the checkpoint data will be reliably stored.
- * Note that this must be a fault-tolerant file system like HDFS for
+ * Note that this must be a fault-tolerant file system like HDFS.
*/
def checkpoint(directory: String) {
if (directory != null) {
@@ -274,7 +274,7 @@ class StreamingContext private[streaming] (
* Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
* @param receiver Custom implementation of Receiver
*
- * @deprecated As of 1.0.0", replaced by `receiverStream`.
+ * @deprecated As of 1.0.0 replaced by `receiverStream`.
*/
@deprecated("Use receiverStream", "1.0.0")
def networkStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = {
@@ -285,7 +285,7 @@ class StreamingContext private[streaming] (
/**
* Create an input stream with any arbitrary user implemented receiver.
- * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
+ * Find more details at http://spark.apache.org/docs/latest/streaming-custom-receivers.html
* @param receiver Custom implementation of Receiver
*/
def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = {
@@ -549,7 +549,7 @@ class StreamingContext private[streaming] (
// Verify whether the DStream checkpoint is serializable
if (isCheckpointingEnabled) {
- val checkpoint = new Checkpoint(this, Time.apply(0))
+ val checkpoint = new Checkpoint(this, Time(0))
try {
Checkpoint.serialize(checkpoint, conf)
} catch {
@@ -575,9 +575,9 @@ class StreamingContext private[streaming] (
*
* Return the current state of the context. The context can be in three possible states -
*
- * - StreamingContextState.INTIALIZED - The context has been created, but not been started yet.
+ * - StreamingContextState.INITIALIZED - The context has been created, but not started yet.
* Input DStreams, transformations and output operations can be created on the context.
- * - StreamingContextState.ACTIVE - The context has been started, and been not stopped.
+ * - StreamingContextState.ACTIVE - The context has been started, and not stopped.
* Input DStreams, transformations and output operations cannot be created on the context.
* - StreamingContextState.STOPPED - The context has been stopped and cannot be used any more.
*/
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index c59348a89d..1dfb4e7abc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -103,7 +103,7 @@ abstract class DStream[T: ClassTag] (
// Reference to whole DStream graph
private[streaming] var graph: DStreamGraph = null
- private[streaming] def isInitialized = (zeroTime != null)
+ private[streaming] def isInitialized = zeroTime != null
// Duration for which the DStream requires its parent DStream to remember each RDD created
private[streaming] def parentRememberDuration = rememberDuration
@@ -189,15 +189,15 @@ abstract class DStream[T: ClassTag] (
*/
private[streaming] def initialize(time: Time) {
if (zeroTime != null && zeroTime != time) {
- throw new SparkException("ZeroTime is already initialized to " + zeroTime
- + ", cannot initialize it again to " + time)
+ throw new SparkException(s"ZeroTime is already initialized to $zeroTime"
+ + s", cannot initialize it again to $time")
}
zeroTime = time
// Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger
if (mustCheckpoint && checkpointDuration == null) {
checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
- logInfo("Checkpoint interval automatically set to " + checkpointDuration)
+ logInfo(s"Checkpoint interval automatically set to $checkpointDuration")
}
// Set the minimum value of the rememberDuration if not already set
@@ -234,7 +234,7 @@ abstract class DStream[T: ClassTag] (
require(
!mustCheckpoint || checkpointDuration != null,
- "The checkpoint interval for " + this.getClass.getSimpleName + " has not been set." +
+ s"The checkpoint interval for ${this.getClass.getSimpleName} has not been set." +
" Please use DStream.checkpoint() to set the interval."
)
@@ -245,53 +245,53 @@ abstract class DStream[T: ClassTag] (
require(
checkpointDuration == null || checkpointDuration >= slideDuration,
- "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " +
- checkpointDuration + " which is lower than its slide time (" + slideDuration + "). " +
- "Please set it to at least " + slideDuration + "."
+ s"The checkpoint interval for ${this.getClass.getSimpleName} has been set to " +
+ s"$checkpointDuration which is lower than its slide time ($slideDuration). " +
+ s"Please set it to at least $slideDuration."
)
require(
checkpointDuration == null || checkpointDuration.isMultipleOf(slideDuration),
- "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " +
- checkpointDuration + " which not a multiple of its slide time (" + slideDuration + "). " +
- "Please set it to a multiple of " + slideDuration + "."
+ s"The checkpoint interval for ${this.getClass.getSimpleName} has been set to " +
+ s" $checkpointDuration which not a multiple of its slide time ($slideDuration). " +
+ s"Please set it to a multiple of $slideDuration."
)
require(
checkpointDuration == null || storageLevel != StorageLevel.NONE,
- "" + this.getClass.getSimpleName + " has been marked for checkpointing but the storage " +
+ s"${this.getClass.getSimpleName} has been marked for checkpointing but the storage " +
"level has not been set to enable persisting. Please use DStream.persist() to set the " +
"storage level to use memory for better checkpointing performance."
)
require(
checkpointDuration == null || rememberDuration > checkpointDuration,
- "The remember duration for " + this.getClass.getSimpleName + " has been set to " +
- rememberDuration + " which is not more than the checkpoint interval (" +
- checkpointDuration + "). Please set it to higher than " + checkpointDuration + "."
+ s"The remember duration for ${this.getClass.getSimpleName} has been set to " +
+ s" $rememberDuration which is not more than the checkpoint interval" +
+ s" ($checkpointDuration). Please set it to higher than $checkpointDuration."
)
dependencies.foreach(_.validateAtStart())
- logInfo("Slide time = " + slideDuration)
- logInfo("Storage level = " + storageLevel)
- logInfo("Checkpoint interval = " + checkpointDuration)
- logInfo("Remember duration = " + rememberDuration)
- logInfo("Initialized and validated " + this)
+ logInfo(s"Slide time = $slideDuration")
+ logInfo(s"Storage level = ${storageLevel.description}")
+ logInfo(s"Checkpoint interval = $checkpointDuration")
+ logInfo(s"Remember duration = $rememberDuration")
+ logInfo(s"Initialized and validated $this")
}
private[streaming] def setContext(s: StreamingContext) {
if (ssc != null && ssc != s) {
- throw new SparkException("Context is already set in " + this + ", cannot set it again")
+ throw new SparkException(s"Context must not be set again for $this")
}
ssc = s
- logInfo("Set context for " + this)
+ logInfo(s"Set context for $this")
dependencies.foreach(_.setContext(ssc))
}
private[streaming] def setGraph(g: DStreamGraph) {
if (graph != null && graph != g) {
- throw new SparkException("Graph is already set in " + this + ", cannot set it again")
+ throw new SparkException(s"Graph must not be set again for $this")
}
graph = g
dependencies.foreach(_.setGraph(graph))
@@ -300,7 +300,7 @@ abstract class DStream[T: ClassTag] (
private[streaming] def remember(duration: Duration) {
if (duration != null && (rememberDuration == null || duration > rememberDuration)) {
rememberDuration = duration
- logInfo("Duration for remembering RDDs set to " + rememberDuration + " for " + this)
+ logInfo(s"Duration for remembering RDDs set to $rememberDuration for $this")
}
dependencies.foreach(_.remember(parentRememberDuration))
}
@@ -310,11 +310,11 @@ abstract class DStream[T: ClassTag] (
if (!isInitialized) {
throw new SparkException (this + " has not been initialized")
} else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) {
- logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime +
- " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime))
+ logInfo(s"Time $time is invalid as zeroTime is $zeroTime" +
+ s" , slideDuration is $slideDuration and difference is ${time - zeroTime}")
false
} else {
- logDebug("Time " + time + " is valid")
+ logDebug(s"Time $time is valid")
true
}
}
@@ -452,20 +452,20 @@ abstract class DStream[T: ClassTag] (
oldRDDs.map(x => s"${x._1} -> ${x._2.id}").mkString(", ") + "]")
generatedRDDs --= oldRDDs.keys
if (unpersistData) {
- logDebug("Unpersisting old RDDs: " + oldRDDs.values.map(_.id).mkString(", "))
+ logDebug(s"Unpersisting old RDDs: ${oldRDDs.values.map(_.id).mkString(", ")}")
oldRDDs.values.foreach { rdd =>
rdd.unpersist(false)
// Explicitly remove blocks of BlockRDD
rdd match {
case b: BlockRDD[_] =>
- logInfo("Removing blocks of RDD " + b + " of time " + time)
+ logInfo(s"Removing blocks of RDD $b of time $time")
b.removeBlocks()
case _ =>
}
}
}
- logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " +
- (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
+ logDebug(s"Cleared ${oldRDDs.size} RDDs that were older than " +
+ s"${time - rememberDuration}: ${oldRDDs.keys.mkString(", ")}")
dependencies.foreach(_.clearMetadata(time))
}
@@ -477,10 +477,10 @@ abstract class DStream[T: ClassTag] (
* this method to save custom checkpoint data.
*/
private[streaming] def updateCheckpointData(currentTime: Time) {
- logDebug("Updating checkpoint data for time " + currentTime)
+ logDebug(s"Updating checkpoint data for time $currentTime")
checkpointData.update(currentTime)
dependencies.foreach(_.updateCheckpointData(currentTime))
- logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData)
+ logDebug(s"Updated checkpoint data for time $currentTime: $checkpointData")
}
private[streaming] def clearCheckpointData(time: Time) {
@@ -509,13 +509,13 @@ abstract class DStream[T: ClassTag] (
@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
- logDebug(this.getClass().getSimpleName + ".writeObject used")
+ logDebug(s"${this.getClass().getSimpleName}.writeObject used")
if (graph != null) {
graph.synchronized {
if (graph.checkpointInProgress) {
oos.defaultWriteObject()
} else {
- val msg = "Object of " + this.getClass.getName + " is being serialized " +
+ val msg = s"Object of ${this.getClass.getName} is being serialized " +
" possibly as a part of closure of an RDD operation. This is because " +
" the DStream object is being referred to from within the closure. " +
" Please rewrite the RDD operation inside this DStream to avoid this. " +
@@ -532,7 +532,7 @@ abstract class DStream[T: ClassTag] (
@throws(classOf[IOException])
private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
- logDebug(this.getClass().getSimpleName + ".readObject used")
+ logDebug(s"${this.getClass().getSimpleName}.readObject used")
ois.defaultReadObject()
generatedRDDs = new HashMap[Time, RDD[T]] ()
}
@@ -756,7 +756,7 @@ abstract class DStream[T: ClassTag] (
val firstNum = rdd.take(num + 1)
// scalastyle:off println
println("-------------------------------------------")
- println("Time: " + time)
+ println(s"Time: $time")
println("-------------------------------------------")
firstNum.take(num).foreach(println)
if (firstNum.length > num) println("...")
@@ -903,21 +903,19 @@ abstract class DStream[T: ClassTag] (
val alignedToTime = if ((toTime - zeroTime).isMultipleOf(slideDuration)) {
toTime
} else {
- logWarning("toTime (" + toTime + ") is not a multiple of slideDuration ("
- + slideDuration + ")")
- toTime.floor(slideDuration, zeroTime)
+ logWarning(s"toTime ($toTime) is not a multiple of slideDuration ($slideDuration)")
+ toTime.floor(slideDuration, zeroTime)
}
val alignedFromTime = if ((fromTime - zeroTime).isMultipleOf(slideDuration)) {
fromTime
} else {
- logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration ("
- + slideDuration + ")")
+ logWarning(s"fromTime ($fromTime) is not a multiple of slideDuration ($slideDuration)")
fromTime.floor(slideDuration, zeroTime)
}
- logInfo("Slicing from " + fromTime + " to " + toTime +
- " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")
+ logInfo(s"Slicing from $fromTime to $toTime" +
+ s" (aligned to $alignedFromTime and $alignedToTime)")
alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
if (time >= zeroTime) getOrCompute(time) else None
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index 95994c983c..d60f418e5c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -28,7 +28,8 @@ import org.apache.spark.util.Utils
/**
* This is the abstract base class for all input streams. This class provides methods
- * start() and stop() which is called by Spark Streaming system to start and stop receiving data.
+ * start() and stop() which are called by Spark Streaming system to start and stop
+ * receiving data, respectively.
* Input streams that can generate RDDs from new data by running a service/thread only on
* the driver node (that is, without running a receiver on worker nodes), can be
* implemented by directly inheriting this InputDStream. For example,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index a18551fac7..565b137228 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -32,7 +32,7 @@ import org.apache.spark.streaming.util.WriteAheadLogUtils
* Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
* that has to start a receiver on worker nodes to receive external data.
* Specific implementations of ReceiverInputDStream must
- * define `the getReceiver()` function that gets the receiver object of type
+ * define [[getReceiver]] function that gets the receiver object of type
* [[org.apache.spark.streaming.receiver.Receiver]] that will be sent
* to the workers to receive data.
* @param ssc_ Streaming context that will execute this input stream
@@ -121,7 +121,7 @@ abstract class ReceiverInputDStream[T: ClassTag](ssc_ : StreamingContext)
}
if (validBlockIds.size != blockIds.size) {
logWarning("Some blocks could not be recovered as they were not found in memory. " +
- "To prevent such data loss, enabled Write Ahead Log (see programming guide " +
+ "To prevent such data loss, enable Write Ahead Log (see programming guide " +
"for more details.")
}
new BlockRDD[T](ssc.sc, validBlockIds)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 43c605af73..faa5aca1d8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -69,7 +69,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
- var numRecords = None: Option[Long]
+ var numRecords: Option[Long] = None
val putResult: Seq[(BlockId, BlockStatus)] = block match {
case ArrayBufferBlock(arrayBuffer) =>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
index b08152485a..639f4259e2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
@@ -103,7 +103,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
/**
* This method is called by the system when the receiver is stopped. All resources
- * (threads, buffers, etc.) setup in `onStart()` must be cleaned up in this method.
+ * (threads, buffers, etc.) set up in `onStart()` must be cleaned up in this method.
*/
def onStop()
@@ -273,7 +273,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
/** Get the attached supervisor. */
private[streaming] def supervisor: ReceiverSupervisor = {
assert(_supervisor != null,
- "A ReceiverSupervisor have not been attached to the receiver yet. Maybe you are starting " +
+ "A ReceiverSupervisor has not been attached to the receiver yet. Maybe you are starting " +
"some computation in the receiver before the Receiver.onStart() has been called.")
_supervisor
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
index c42a9ac233..d0195fb14f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
@@ -143,10 +143,10 @@ private[streaming] abstract class ReceiverSupervisor(
def startReceiver(): Unit = synchronized {
try {
if (onReceiverStart()) {
- logInfo("Starting receiver")
+ logInfo(s"Starting receiver $streamId")
receiverState = Started
receiver.onStart()
- logInfo("Called receiver onStart")
+ logInfo(s"Called receiver $streamId onStart")
} else {
// The driver refused us
stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
@@ -218,11 +218,9 @@ private[streaming] abstract class ReceiverSupervisor(
stopLatch.await()
if (stoppingError != null) {
logError("Stopped receiver with error: " + stoppingError)
+ throw stoppingError
} else {
logInfo("Stopped receiver without error")
}
- if (stoppingError != null) {
- throw stoppingError
- }
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
index f76300351e..6e7232a2a0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -59,17 +59,15 @@ case class JobSet(
// Time taken to process all the jobs from the time they were submitted
// (i.e. including the time they wait in the streaming scheduler queue)
- def totalDelay: Long = {
- processingEndTime - time.milliseconds
- }
+ def totalDelay: Long = processingEndTime - time.milliseconds
def toBatchInfo: BatchInfo = {
BatchInfo(
time,
streamIdToInputInfo,
submissionTime,
- if (processingStartTime >= 0) Some(processingStartTime) else None,
- if (processingEndTime >= 0) Some(processingEndTime) else None,
+ if (hasStarted) Some(processingStartTime) else None,
+ if (hasCompleted) Some(processingEndTime) else None,
jobs.map { job => (job.outputOpId, job.toOutputOperationInfo) }.toMap
)
}