diff options
author | Jacek Laskowski <jacek@japila.pl> | 2016-05-05 16:34:27 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2016-05-05 16:34:27 -0700 |
commit | bbb77734374010e36731bf6db1fac0273de8206d (patch) | |
tree | 0ee24dee864521415ce1ae5e3a0b9857e147b4c9 /streaming/src/main | |
parent | 02c07e8999dca545849cb3aa758a624dc51cd1e9 (diff) | |
download | spark-bbb77734374010e36731bf6db1fac0273de8206d.tar.gz spark-bbb77734374010e36731bf6db1fac0273de8206d.tar.bz2 spark-bbb77734374010e36731bf6db1fac0273de8206d.zip |
[SPARK-15152][DOC][MINOR] Scaladoc and Code style Improvements
## What changes were proposed in this pull request?
Minor doc and code style fixes
## How was this patch tested?
local build
Author: Jacek Laskowski <jacek@japila.pl>
Closes #12928 from jaceklaskowski/SPARK-15152.
Diffstat (limited to 'streaming/src/main')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala | 49 |
1 files changed, 25 insertions, 24 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 7d8b8679c5..6ececb1062 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -84,7 +84,7 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) assert(framework != null, "Checkpoint.framework is null") assert(graph != null, "Checkpoint.graph is null") assert(checkpointTime != null, "Checkpoint.checkpointTime is null") - logInfo("Checkpoint for time " + checkpointTime + " validated") + logInfo(s"Checkpoint for time $checkpointTime validated") } } @@ -103,7 +103,10 @@ object Checkpoint extends Logging { new Path(checkpointDir, PREFIX + checkpointTime.milliseconds + ".bk") } - /** Get checkpoint files present in the give directory, ordered by oldest-first */ + /** + * @param checkpointDir checkpoint directory to read checkpoint files from + * @return checkpoint files from the `checkpointDir` checkpoint directory, ordered by oldest-first + */ def getCheckpointFiles(checkpointDir: String, fsOption: Option[FileSystem] = None): Seq[Path] = { def sortFunc(path1: Path, path2: Path): Boolean = { @@ -121,11 +124,11 @@ object Checkpoint extends Logging { val filtered = paths.filter(p => REGEX.findFirstIn(p.toString).nonEmpty) filtered.sortWith(sortFunc) } else { - logWarning("Listing " + path + " returned null") + logWarning(s"Listing $path returned null") Seq.empty } } else { - logInfo("Checkpoint directory " + path + " does not exist") + logWarning(s"Checkpoint directory $path does not exist") Seq.empty } } @@ -205,7 +208,7 @@ class CheckpointWriter( // time of a batch is greater than the batch interval, checkpointing for completing an old // batch may run after checkpointing of a new batch. If this happens, checkpoint of an old // batch actually has the latest information, so we want to recovery from it. Therefore, we - // also use the latest checkpoint time as the file name, so that we can recovery from the + // also use the latest checkpoint time as the file name, so that we can recover from the // latest checkpoint file. // // Note: there is only one thread writing the checkpoint files, so we don't need to worry @@ -216,8 +219,7 @@ class CheckpointWriter( while (attempts < MAX_ATTEMPTS && !stopped) { attempts += 1 try { - logInfo("Saving checkpoint for time " + checkpointTime + " to file '" + checkpointFile - + "'") + logInfo(s"Saving checkpoint for time $checkpointTime to file '$checkpointFile'") // Write checkpoint to temp file if (fs.exists(tempFile)) { @@ -237,39 +239,38 @@ class CheckpointWriter( fs.delete(backupFile, true) // just in case it exists } if (!fs.rename(checkpointFile, backupFile)) { - logWarning("Could not rename " + checkpointFile + " to " + backupFile) + logWarning(s"Could not rename $checkpointFile to $backupFile") } } // Rename temp file to the final checkpoint file if (!fs.rename(tempFile, checkpointFile)) { - logWarning("Could not rename " + tempFile + " to " + checkpointFile) + logWarning(s"Could not rename $tempFile to $checkpointFile") } // Delete old checkpoint files val allCheckpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)) if (allCheckpointFiles.size > 10) { allCheckpointFiles.take(allCheckpointFiles.size - 10).foreach { file => - logInfo("Deleting " + file) + logInfo(s"Deleting $file") fs.delete(file, true) } } // All done, print success val finishTime = System.currentTimeMillis() - logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + checkpointFile + - "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " ms") + logInfo(s"Checkpoint for time $checkpointTime saved to file '$checkpointFile'" + + s", took ${bytes.length} bytes and ${finishTime - startTime} ms") jobGenerator.onCheckpointCompletion(checkpointTime, clearCheckpointDataLater) return } catch { case ioe: IOException => - logWarning("Error in attempt " + attempts + " of writing checkpoint to " - + checkpointFile, ioe) + val msg = s"Error in attempt $attempts of writing checkpoint to '$checkpointFile'" + logWarning(msg, ioe) fs = null } } - logWarning("Could not write checkpoint for time " + checkpointTime + " to file " - + checkpointFile + "'") + logWarning(s"Could not write checkpoint for time $checkpointTime to file '$checkpointFile'") } } @@ -278,7 +279,7 @@ class CheckpointWriter( val bytes = Checkpoint.serialize(checkpoint, conf) executor.execute(new CheckpointWriteHandler( checkpoint.checkpointTime, bytes, clearCheckpointDataLater)) - logInfo("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue") + logInfo(s"Submitted checkpoint of time ${checkpoint.checkpointTime} to writer queue") } catch { case rej: RejectedExecutionException => logError("Could not submit checkpoint task to the thread pool executor", rej) @@ -295,8 +296,8 @@ class CheckpointWriter( executor.shutdownNow() } val endTime = System.currentTimeMillis() - logInfo("CheckpointWriter executor terminated ? " + terminated + - ", waited for " + (endTime - startTime) + " ms.") + logInfo(s"CheckpointWriter executor terminated? $terminated," + + s" waited for ${endTime - startTime} ms.") stopped = true } } @@ -336,20 +337,20 @@ object CheckpointReader extends Logging { } // Try to read the checkpoint files in the order - logInfo("Checkpoint files found: " + checkpointFiles.mkString(",")) + logInfo(s"Checkpoint files found: ${checkpointFiles.mkString(",")}") var readError: Exception = null checkpointFiles.foreach { file => - logInfo("Attempting to load checkpoint from file " + file) + logInfo(s"Attempting to load checkpoint from file $file") try { val fis = fs.open(file) val cp = Checkpoint.deserialize(fis, conf) - logInfo("Checkpoint successfully loaded from file " + file) - logInfo("Checkpoint was generated at time " + cp.checkpointTime) + logInfo(s"Checkpoint successfully loaded from file $file") + logInfo(s"Checkpoint was generated at time ${cp.checkpointTime}") return Some(cp) } catch { case e: Exception => readError = e - logWarning("Error reading checkpoint from file " + file, e) + logWarning(s"Error reading checkpoint from file $file", e) } } |