aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorJacek Laskowski <jacek@japila.pl>2016-05-05 16:34:27 -0700
committerAndrew Or <andrew@databricks.com>2016-05-05 16:34:27 -0700
commitbbb77734374010e36731bf6db1fac0273de8206d (patch)
tree0ee24dee864521415ce1ae5e3a0b9857e147b4c9 /streaming/src
parent02c07e8999dca545849cb3aa758a624dc51cd1e9 (diff)
downloadspark-bbb77734374010e36731bf6db1fac0273de8206d.tar.gz
spark-bbb77734374010e36731bf6db1fac0273de8206d.tar.bz2
spark-bbb77734374010e36731bf6db1fac0273de8206d.zip
[SPARK-15152][DOC][MINOR] Scaladoc and Code style Improvements
## What changes were proposed in this pull request? Minor doc and code style fixes ## How was this patch tested? local build Author: Jacek Laskowski <jacek@japila.pl> Closes #12928 from jaceklaskowski/SPARK-15152.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala49
1 files changed, 25 insertions, 24 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 7d8b8679c5..6ececb1062 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -84,7 +84,7 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
assert(framework != null, "Checkpoint.framework is null")
assert(graph != null, "Checkpoint.graph is null")
assert(checkpointTime != null, "Checkpoint.checkpointTime is null")
- logInfo("Checkpoint for time " + checkpointTime + " validated")
+ logInfo(s"Checkpoint for time $checkpointTime validated")
}
}
@@ -103,7 +103,10 @@ object Checkpoint extends Logging {
new Path(checkpointDir, PREFIX + checkpointTime.milliseconds + ".bk")
}
- /** Get checkpoint files present in the give directory, ordered by oldest-first */
+ /**
+ * @param checkpointDir checkpoint directory to read checkpoint files from
+ * @return checkpoint files from the `checkpointDir` checkpoint directory, ordered by oldest-first
+ */
def getCheckpointFiles(checkpointDir: String, fsOption: Option[FileSystem] = None): Seq[Path] = {
def sortFunc(path1: Path, path2: Path): Boolean = {
@@ -121,11 +124,11 @@ object Checkpoint extends Logging {
val filtered = paths.filter(p => REGEX.findFirstIn(p.toString).nonEmpty)
filtered.sortWith(sortFunc)
} else {
- logWarning("Listing " + path + " returned null")
+ logWarning(s"Listing $path returned null")
Seq.empty
}
} else {
- logInfo("Checkpoint directory " + path + " does not exist")
+ logWarning(s"Checkpoint directory $path does not exist")
Seq.empty
}
}
@@ -205,7 +208,7 @@ class CheckpointWriter(
// time of a batch is greater than the batch interval, checkpointing for completing an old
// batch may run after checkpointing of a new batch. If this happens, checkpoint of an old
// batch actually has the latest information, so we want to recovery from it. Therefore, we
- // also use the latest checkpoint time as the file name, so that we can recovery from the
+ // also use the latest checkpoint time as the file name, so that we can recover from the
// latest checkpoint file.
//
// Note: there is only one thread writing the checkpoint files, so we don't need to worry
@@ -216,8 +219,7 @@ class CheckpointWriter(
while (attempts < MAX_ATTEMPTS && !stopped) {
attempts += 1
try {
- logInfo("Saving checkpoint for time " + checkpointTime + " to file '" + checkpointFile
- + "'")
+ logInfo(s"Saving checkpoint for time $checkpointTime to file '$checkpointFile'")
// Write checkpoint to temp file
if (fs.exists(tempFile)) {
@@ -237,39 +239,38 @@ class CheckpointWriter(
fs.delete(backupFile, true) // just in case it exists
}
if (!fs.rename(checkpointFile, backupFile)) {
- logWarning("Could not rename " + checkpointFile + " to " + backupFile)
+ logWarning(s"Could not rename $checkpointFile to $backupFile")
}
}
// Rename temp file to the final checkpoint file
if (!fs.rename(tempFile, checkpointFile)) {
- logWarning("Could not rename " + tempFile + " to " + checkpointFile)
+ logWarning(s"Could not rename $tempFile to $checkpointFile")
}
// Delete old checkpoint files
val allCheckpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, Some(fs))
if (allCheckpointFiles.size > 10) {
allCheckpointFiles.take(allCheckpointFiles.size - 10).foreach { file =>
- logInfo("Deleting " + file)
+ logInfo(s"Deleting $file")
fs.delete(file, true)
}
}
// All done, print success
val finishTime = System.currentTimeMillis()
- logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + checkpointFile +
- "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " ms")
+ logInfo(s"Checkpoint for time $checkpointTime saved to file '$checkpointFile'" +
+ s", took ${bytes.length} bytes and ${finishTime - startTime} ms")
jobGenerator.onCheckpointCompletion(checkpointTime, clearCheckpointDataLater)
return
} catch {
case ioe: IOException =>
- logWarning("Error in attempt " + attempts + " of writing checkpoint to "
- + checkpointFile, ioe)
+ val msg = s"Error in attempt $attempts of writing checkpoint to '$checkpointFile'"
+ logWarning(msg, ioe)
fs = null
}
}
- logWarning("Could not write checkpoint for time " + checkpointTime + " to file "
- + checkpointFile + "'")
+ logWarning(s"Could not write checkpoint for time $checkpointTime to file '$checkpointFile'")
}
}
@@ -278,7 +279,7 @@ class CheckpointWriter(
val bytes = Checkpoint.serialize(checkpoint, conf)
executor.execute(new CheckpointWriteHandler(
checkpoint.checkpointTime, bytes, clearCheckpointDataLater))
- logInfo("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue")
+ logInfo(s"Submitted checkpoint of time ${checkpoint.checkpointTime} to writer queue")
} catch {
case rej: RejectedExecutionException =>
logError("Could not submit checkpoint task to the thread pool executor", rej)
@@ -295,8 +296,8 @@ class CheckpointWriter(
executor.shutdownNow()
}
val endTime = System.currentTimeMillis()
- logInfo("CheckpointWriter executor terminated ? " + terminated +
- ", waited for " + (endTime - startTime) + " ms.")
+ logInfo(s"CheckpointWriter executor terminated? $terminated," +
+ s" waited for ${endTime - startTime} ms.")
stopped = true
}
}
@@ -336,20 +337,20 @@ object CheckpointReader extends Logging {
}
// Try to read the checkpoint files in the order
- logInfo("Checkpoint files found: " + checkpointFiles.mkString(","))
+ logInfo(s"Checkpoint files found: ${checkpointFiles.mkString(",")}")
var readError: Exception = null
checkpointFiles.foreach { file =>
- logInfo("Attempting to load checkpoint from file " + file)
+ logInfo(s"Attempting to load checkpoint from file $file")
try {
val fis = fs.open(file)
val cp = Checkpoint.deserialize(fis, conf)
- logInfo("Checkpoint successfully loaded from file " + file)
- logInfo("Checkpoint was generated at time " + cp.checkpointTime)
+ logInfo(s"Checkpoint successfully loaded from file $file")
+ logInfo(s"Checkpoint was generated at time ${cp.checkpointTime}")
return Some(cp)
} catch {
case e: Exception =>
readError = e
- logWarning("Error reading checkpoint from file " + file, e)
+ logWarning(s"Error reading checkpoint from file $file", e)
}
}