aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJacek Laskowski <jacek@japila.pl>2016-05-05 16:34:27 -0700
committerAndrew Or <andrew@databricks.com>2016-05-05 16:34:27 -0700
commitbbb77734374010e36731bf6db1fac0273de8206d (patch)
tree0ee24dee864521415ce1ae5e3a0b9857e147b4c9
parent02c07e8999dca545849cb3aa758a624dc51cd1e9 (diff)
downloadspark-bbb77734374010e36731bf6db1fac0273de8206d.tar.gz
spark-bbb77734374010e36731bf6db1fac0273de8206d.tar.bz2
spark-bbb77734374010e36731bf6db1fac0273de8206d.zip
[SPARK-15152][DOC][MINOR] Scaladoc and Code style Improvements
## What changes were proposed in this pull request? Minor doc and code style fixes ## How was this patch tested? local build Author: Jacek Laskowski <jacek@japila.pl> Closes #12928 from jaceklaskowski/SPARK-15152.
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulator.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Pool.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala6
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala5
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala49
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala4
15 files changed, 66 insertions, 68 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulator.scala b/core/src/main/scala/org/apache/spark/Accumulator.scala
index 23245043e2..9d1f1d59db 100644
--- a/core/src/main/scala/org/apache/spark/Accumulator.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulator.scala
@@ -24,16 +24,17 @@ package org.apache.spark
* They can be used to implement counters (as in MapReduce) or sums. Spark natively supports
* accumulators of numeric value types, and programmers can add support for new types.
*
- * An accumulator is created from an initial value `v` by calling [[SparkContext#accumulator]].
- * Tasks running on the cluster can then add to it using the [[Accumulable#+=]] operator.
+ * An accumulator is created from an initial value `v` by calling
+ * [[SparkContext#accumulator SparkContext.accumulator]].
+ * Tasks running on the cluster can then add to it using the [[Accumulable#+= +=]] operator.
* However, they cannot read its value. Only the driver program can read the accumulator's value,
- * using its value method.
+ * using its [[#value]] method.
*
* The interpreter session below shows an accumulator being used to add up the elements of an array:
*
* {{{
* scala> val accum = sc.accumulator(0)
- * accum: spark.Accumulator[Int] = 0
+ * accum: org.apache.spark.Accumulator[Int] = 0
*
* scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
* ...
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
index a79e71ec7c..5987cfea2e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
@@ -26,16 +26,14 @@ import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
/**
- * An Schedulable entity that represent collection of Pools or TaskSetManagers
+ * An Schedulable entity that represents collection of Pools or TaskSetManagers
*/
-
private[spark] class Pool(
val poolName: String,
val schedulingMode: SchedulingMode,
initMinShare: Int,
initWeight: Int)
- extends Schedulable
- with Logging {
+ extends Schedulable with Logging {
val schedulableQueue = new ConcurrentLinkedQueue[Schedulable]
val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable]
@@ -56,7 +54,8 @@ private[spark] class Pool(
case SchedulingMode.FIFO =>
new FIFOSchedulingAlgorithm()
case _ =>
- throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode")
+ val msg = "Unsupported scheduling mode: $schedulingMode. Use FAIR or FIFO instead."
+ throw new IllegalArgumentException(msg)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala
index 864941d468..18ebbbe78a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala
@@ -36,11 +36,7 @@ private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
val stageId2 = s2.stageId
res = math.signum(stageId1 - stageId2)
}
- if (res < 0) {
- true
- } else {
- false
- }
+ res < 0
}
}
@@ -52,12 +48,12 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
val runningTasks2 = s2.runningTasks
val s1Needy = runningTasks1 < minShare1
val s2Needy = runningTasks2 < minShare2
- val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble
- val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
+ val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0)
+ val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0)
val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
- var compare: Int = 0
+ var compare = 0
if (s1Needy && !s2Needy) {
return true
} else if (!s1Needy && s2Needy) {
@@ -67,7 +63,6 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
} else {
compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
}
-
if (compare < 0) {
true
} else if (compare > 0) {
diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
index bd26bfd848..93ac67e5db 100644
--- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
+++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
@@ -170,9 +170,7 @@ private [util] class SparkShutdownHookManager {
@volatile private var shuttingDown = false
/**
- * Install a hook to run at shutdown and run all registered hooks in order. Hadoop 1.x does not
- * have `ShutdownHookManager`, so in that case we just use the JVM's `Runtime` object and hope for
- * the best.
+ * Install a hook to run at shutdown and run all registered hooks in order.
*/
def install(): Unit = {
val hookTask = new Runnable() {
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala
index 898ac2cc89..35bc46a5f3 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala
@@ -104,9 +104,9 @@ final class Binarizer(override val uid: String)
case DoubleType =>
BinaryAttribute.defaultAttr.withName(outputColName).toStructField()
case _: VectorUDT =>
- new StructField(outputColName, new VectorUDT, true)
- case other =>
- throw new IllegalArgumentException(s"Data type $other is not supported.")
+ StructField(outputColName, new VectorUDT)
+ case _ =>
+ throw new IllegalArgumentException(s"Data type $inputType is not supported.")
}
if (schema.fieldNames.contains(outputColName)) {
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
index 774170ff40..86ce9705a3 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
@@ -104,7 +104,7 @@ object MLUtils {
val (indices, values) = items.tail.filter(_.nonEmpty).map { item =>
val indexAndValue = item.split(':')
val index = indexAndValue(0).toInt - 1 // Convert 1-based indices to 0-based.
- val value = indexAndValue(1).toDouble
+ val value = indexAndValue(1).toDouble
(index, value)
}.unzip
@@ -119,8 +119,7 @@ object MLUtils {
previous = current
i += 1
}
-
- (label, indices.toArray, values.toArray)
+ (label, indices, values)
}
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
index 516b41cb13..8b1a34f79c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.TreeNode
/**
- * Given a [[plans.logical.LogicalPlan LogicalPlan]], returns a list of `PhysicalPlan`s that can
+ * Given a [[LogicalPlan]], returns a list of `PhysicalPlan`s that can
* be used for execution. If this strategy does not apply to the give logical operation then an
* empty list should be returned.
*/
@@ -31,9 +31,10 @@ abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends L
}
/**
- * Abstract class for transforming [[plans.logical.LogicalPlan LogicalPlan]]s into physical plans.
- * Child classes are responsible for specifying a list of [[Strategy]] objects that each of which
- * can return a list of possible physical plan options. If a given strategy is unable to plan all
+ * Abstract class for transforming [[LogicalPlan]]s into physical plans.
+ * Child classes are responsible for specifying a list of [[GenericStrategy]] objects that
+ * each of which can return a list of possible physical plan options.
+ * If a given strategy is unable to plan all
* of the remaining operators in the tree, it can call [[planLater]], which returns a placeholder
* object that will be filled in using other available strategies.
*
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala
index c201822d44..1be41ffc07 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartit
import org.apache.spark.sql.execution.metric.SQLMetrics
/**
- * Apply the all of the GroupExpressions to every input row, hence we will get
+ * Apply all of the GroupExpressions to every input row, hence we will get
* multiple output rows for a input row.
* @param projections The group of expressions, all of the group expressions should
* output the same schema specified bye the parameter `output`
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 0bbe970420..b94b84d77a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -106,16 +106,20 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil)
/**
- * Returns the result of this query as an RDD[InternalRow] by delegating to doExecute after
- * preparations. Concrete implementations of SparkPlan should override doExecute.
+ * Returns the result of this query as an RDD[InternalRow] by delegating to `doExecute` after
+ * preparations.
+ *
+ * Concrete implementations of SparkPlan should override `doExecute`.
*/
final def execute(): RDD[InternalRow] = executeQuery {
doExecute()
}
/**
- * Returns the result of this query as a broadcast variable by delegating to doBroadcast after
- * preparations. Concrete implementations of SparkPlan should override doBroadcast.
+ * Returns the result of this query as a broadcast variable by delegating to `doExecuteBroadcast`
+ * after preparations.
+ *
+ * Concrete implementations of SparkPlan should override `doExecuteBroadcast`.
*/
final def executeBroadcast[T](): broadcast.Broadcast[T] = executeQuery {
doExecuteBroadcast()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index 15b4abe806..d6f7b6ed35 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -105,7 +105,7 @@ trait CodegenSupport extends SparkPlan {
protected def doProduce(ctx: CodegenContext): String
/**
- * Consume the generated columns or row from current SparkPlan, call it's parent's doConsume().
+ * Consume the generated columns or row from current SparkPlan, call its parent's `doConsume()`.
*/
final def consume(ctx: CodegenContext, outputVars: Seq[ExprCode], row: String = null): String = {
val inputVars =
@@ -212,8 +212,8 @@ trait CodegenSupport extends SparkPlan {
/**
* InputAdapter is used to hide a SparkPlan from a subtree that support codegen.
*
- * This is the leaf node of a tree with WholeStageCodegen, is used to generate code that consumes
- * an RDD iterator of InternalRow.
+ * This is the leaf node of a tree with WholeStageCodegen that is used to generate code
+ * that consumes an RDD iterator of InternalRow.
*/
case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupport {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
index 56a3906951..bafbbdf657 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
@@ -158,7 +158,7 @@ case class MapPartitionsExec(
* Applies the given function to each input object.
* The output of its child must be a single-field row containing the input object.
*
- * This operator is kind of a safe version of [[ProjectExec]], as it's output is custom object,
+ * This operator is kind of a safe version of [[ProjectExec]], as its output is custom object,
* we need to use safe row to contain it.
*/
case class MapElementsExec(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 8e66538575..7b4c035bf3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -95,7 +95,7 @@ class FileStreamSource(
val endId = end.asInstanceOf[LongOffset].offset
assert(startId <= endId)
- val files = metadataLog.get(Some(startId + 1), Some(endId)).map(_._2).flatten
+ val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2)
logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId")
logDebug(s"Streaming ${files.mkString(", ")}")
dataFrameBuilder(files)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index b89144d727..e9052a3095 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -47,7 +47,7 @@ class IncrementalExecution(
/**
* Records the current id for a given stateful operator in the query plan as the `state`
- * preperation walks the query plan.
+ * preparation walks the query plan.
*/
private var operatorId = 0
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 7d8b8679c5..6ececb1062 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -84,7 +84,7 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
assert(framework != null, "Checkpoint.framework is null")
assert(graph != null, "Checkpoint.graph is null")
assert(checkpointTime != null, "Checkpoint.checkpointTime is null")
- logInfo("Checkpoint for time " + checkpointTime + " validated")
+ logInfo(s"Checkpoint for time $checkpointTime validated")
}
}
@@ -103,7 +103,10 @@ object Checkpoint extends Logging {
new Path(checkpointDir, PREFIX + checkpointTime.milliseconds + ".bk")
}
- /** Get checkpoint files present in the give directory, ordered by oldest-first */
+ /**
+ * @param checkpointDir checkpoint directory to read checkpoint files from
+ * @return checkpoint files from the `checkpointDir` checkpoint directory, ordered by oldest-first
+ */
def getCheckpointFiles(checkpointDir: String, fsOption: Option[FileSystem] = None): Seq[Path] = {
def sortFunc(path1: Path, path2: Path): Boolean = {
@@ -121,11 +124,11 @@ object Checkpoint extends Logging {
val filtered = paths.filter(p => REGEX.findFirstIn(p.toString).nonEmpty)
filtered.sortWith(sortFunc)
} else {
- logWarning("Listing " + path + " returned null")
+ logWarning(s"Listing $path returned null")
Seq.empty
}
} else {
- logInfo("Checkpoint directory " + path + " does not exist")
+ logWarning(s"Checkpoint directory $path does not exist")
Seq.empty
}
}
@@ -205,7 +208,7 @@ class CheckpointWriter(
// time of a batch is greater than the batch interval, checkpointing for completing an old
// batch may run after checkpointing of a new batch. If this happens, checkpoint of an old
// batch actually has the latest information, so we want to recovery from it. Therefore, we
- // also use the latest checkpoint time as the file name, so that we can recovery from the
+ // also use the latest checkpoint time as the file name, so that we can recover from the
// latest checkpoint file.
//
// Note: there is only one thread writing the checkpoint files, so we don't need to worry
@@ -216,8 +219,7 @@ class CheckpointWriter(
while (attempts < MAX_ATTEMPTS && !stopped) {
attempts += 1
try {
- logInfo("Saving checkpoint for time " + checkpointTime + " to file '" + checkpointFile
- + "'")
+ logInfo(s"Saving checkpoint for time $checkpointTime to file '$checkpointFile'")
// Write checkpoint to temp file
if (fs.exists(tempFile)) {
@@ -237,39 +239,38 @@ class CheckpointWriter(
fs.delete(backupFile, true) // just in case it exists
}
if (!fs.rename(checkpointFile, backupFile)) {
- logWarning("Could not rename " + checkpointFile + " to " + backupFile)
+ logWarning(s"Could not rename $checkpointFile to $backupFile")
}
}
// Rename temp file to the final checkpoint file
if (!fs.rename(tempFile, checkpointFile)) {
- logWarning("Could not rename " + tempFile + " to " + checkpointFile)
+ logWarning(s"Could not rename $tempFile to $checkpointFile")
}
// Delete old checkpoint files
val allCheckpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, Some(fs))
if (allCheckpointFiles.size > 10) {
allCheckpointFiles.take(allCheckpointFiles.size - 10).foreach { file =>
- logInfo("Deleting " + file)
+ logInfo(s"Deleting $file")
fs.delete(file, true)
}
}
// All done, print success
val finishTime = System.currentTimeMillis()
- logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + checkpointFile +
- "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " ms")
+ logInfo(s"Checkpoint for time $checkpointTime saved to file '$checkpointFile'" +
+ s", took ${bytes.length} bytes and ${finishTime - startTime} ms")
jobGenerator.onCheckpointCompletion(checkpointTime, clearCheckpointDataLater)
return
} catch {
case ioe: IOException =>
- logWarning("Error in attempt " + attempts + " of writing checkpoint to "
- + checkpointFile, ioe)
+ val msg = s"Error in attempt $attempts of writing checkpoint to '$checkpointFile'"
+ logWarning(msg, ioe)
fs = null
}
}
- logWarning("Could not write checkpoint for time " + checkpointTime + " to file "
- + checkpointFile + "'")
+ logWarning(s"Could not write checkpoint for time $checkpointTime to file '$checkpointFile'")
}
}
@@ -278,7 +279,7 @@ class CheckpointWriter(
val bytes = Checkpoint.serialize(checkpoint, conf)
executor.execute(new CheckpointWriteHandler(
checkpoint.checkpointTime, bytes, clearCheckpointDataLater))
- logInfo("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue")
+ logInfo(s"Submitted checkpoint of time ${checkpoint.checkpointTime} to writer queue")
} catch {
case rej: RejectedExecutionException =>
logError("Could not submit checkpoint task to the thread pool executor", rej)
@@ -295,8 +296,8 @@ class CheckpointWriter(
executor.shutdownNow()
}
val endTime = System.currentTimeMillis()
- logInfo("CheckpointWriter executor terminated ? " + terminated +
- ", waited for " + (endTime - startTime) + " ms.")
+ logInfo(s"CheckpointWriter executor terminated? $terminated," +
+ s" waited for ${endTime - startTime} ms.")
stopped = true
}
}
@@ -336,20 +337,20 @@ object CheckpointReader extends Logging {
}
// Try to read the checkpoint files in the order
- logInfo("Checkpoint files found: " + checkpointFiles.mkString(","))
+ logInfo(s"Checkpoint files found: ${checkpointFiles.mkString(",")}")
var readError: Exception = null
checkpointFiles.foreach { file =>
- logInfo("Attempting to load checkpoint from file " + file)
+ logInfo(s"Attempting to load checkpoint from file $file")
try {
val fis = fs.open(file)
val cp = Checkpoint.deserialize(fis, conf)
- logInfo("Checkpoint successfully loaded from file " + file)
- logInfo("Checkpoint was generated at time " + cp.checkpointTime)
+ logInfo(s"Checkpoint successfully loaded from file $file")
+ logInfo(s"Checkpoint was generated at time ${cp.checkpointTime}")
return Some(cp)
} catch {
case e: Exception =>
readError = e
- logWarning("Error reading checkpoint from file " + file, e)
+ logWarning(s"Error reading checkpoint from file $file", e)
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index b494ef0dd9..7ea58afb53 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -158,7 +158,7 @@ private[spark] class Client(
val newAppResponse = newApp.getNewApplicationResponse()
appId = newAppResponse.getApplicationId()
reportLauncherState(SparkAppHandle.State.SUBMITTED)
- launcherBackend.setAppId(appId.toString())
+ launcherBackend.setAppId(appId.toString)
// Verify whether the cluster has enough resources for our AM
verifyClusterResources(newAppResponse)
@@ -168,7 +168,7 @@ private[spark] class Client(
val appContext = createApplicationSubmissionContext(newApp, containerContext)
// Finally, submit and monitor the application
- logInfo(s"Submitting application ${appId.getId} to ResourceManager")
+ logInfo(s"Submitting application $appId to ResourceManager")
yarnClient.submitApplication(appContext)
appId
} catch {