aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulators.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/Aggregator.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/Partitioner.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/api/r/RRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala2
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/BlockMessage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/BlockMessageArray.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/SecurityMessage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/api.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/SparkUI.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/UIUtils.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/CompletionIterator.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/Distribution.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/MutablePair.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/SizeEstimator.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/BitSet.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala2
58 files changed, 104 insertions, 105 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index 330df1d59a..5a8d17bd99 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -228,7 +228,7 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
* @tparam T result type
*/
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String])
- extends Accumulable[T,T](initialValue, param, name) {
+ extends Accumulable[T, T](initialValue, param, name) {
def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None)
}
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index af9765d313..b8a5f50168 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -45,7 +45,7 @@ case class Aggregator[K, V, C] (
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]],
context: TaskContext): Iterator[(K, C)] = {
if (!isSpillEnabled) {
- val combiners = new AppendOnlyMap[K,C]
+ val combiners = new AppendOnlyMap[K, C]
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
@@ -76,7 +76,7 @@ case class Aggregator[K, V, C] (
: Iterator[(K, C)] =
{
if (!isSpillEnabled) {
- val combiners = new AppendOnlyMap[K,C]
+ val combiners = new AppendOnlyMap[K, C]
var kc: Product2[K, C] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeCombiners(oldValue, kc._2) else kc._2
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index b8d244408b..82889bcd30 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -103,7 +103,7 @@ class HashPartitioner(partitions: Int) extends Partitioner {
*/
class RangePartitioner[K : Ordering : ClassTag, V](
@transient partitions: Int,
- @transient rdd: RDD[_ <: Product2[K,V]],
+ @transient rdd: RDD[_ <: Product2[K, V]],
private var ascending: Boolean = true)
extends Partitioner {
@@ -185,7 +185,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
}
override def equals(other: Any): Boolean = other match {
- case r: RangePartitioner[_,_] =>
+ case r: RangePartitioner[_, _] =>
r.rangeBounds.sameElements(rangeBounds) && r.ascending == ascending
case _ =>
false
@@ -249,7 +249,7 @@ private[spark] object RangePartitioner {
* @param sampleSizePerPartition max sample size per partition
* @return (total number of items, an array of (partitionId, number of items, sample))
*/
- def sketch[K:ClassTag](
+ def sketch[K : ClassTag](
rdd: RDD[K],
sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {
val shift = rdd.id
@@ -272,7 +272,7 @@ private[spark] object RangePartitioner {
* @param partitions number of partitions
* @return selected bounds
*/
- def determineBounds[K:Ordering:ClassTag](
+ def determineBounds[K : Ordering : ClassTag](
candidates: ArrayBuffer[(K, Float)],
partitions: Int): Array[K] = {
val ordering = implicitly[Ordering[K]]
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index b5e5d6f146..4b5bcb54aa 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -481,7 +481,7 @@ private[spark] object SparkConf extends Logging {
"are no longer accepted. To specify the equivalent now, one may use '64k'.")
)
- Map(configs.map { cfg => (cfg.key -> cfg) }:_*)
+ Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ea6c0dea08..a453c9bf48 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -389,7 +389,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
- _jars =_conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten
+ _jars = _conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten
_files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0))
.toSeq.flatten
@@ -438,7 +438,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_ui =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
- _env.securityManager,appName, startTime = startTime))
+ _env.securityManager, appName, startTime = startTime))
} else {
// For tests, do not enable the UI
None
@@ -917,7 +917,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
classOf[FixedLengthBinaryInputFormat],
classOf[LongWritable],
classOf[BytesWritable],
- conf=conf)
+ conf = conf)
val data = br.map { case (k, v) =>
val bytes = v.getBytes
assert(bytes.length == recordLength, "Byte array does not have correct length")
@@ -1267,7 +1267,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T]
(initialValue: R): Accumulable[R, T] = {
- val param = new GrowableAccumulableParam[R,T]
+ val param = new GrowableAccumulableParam[R, T]
val acc = new Accumulable(initialValue, param)
cleaner.foreach(_.registerAccumulatorForCleanup(acc))
acc
@@ -1316,7 +1316,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
val uri = new URI(path)
val schemeCorrectedPath = uri.getScheme match {
case null | "local" => new File(path).getCanonicalFile.toURI.toString
- case _ => path
+ case _ => path
}
val hadoopPath = new Path(schemeCorrectedPath)
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 3271145428..a185954089 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -298,7 +298,7 @@ object SparkEnv extends Logging {
}
}
- val mapOutputTracker = if (isDriver) {
+ val mapOutputTracker = if (isDriver) {
new MapOutputTrackerMaster(conf)
} else {
new MapOutputTrackerWorker(conf)
@@ -348,7 +348,7 @@ object SparkEnv extends Logging {
val fileServerPort = conf.getInt("spark.fileserver.port", 0)
val server = new HttpFileServer(conf, securityManager, fileServerPort)
server.initialize()
- conf.set("spark.fileserver.uri", server.serverUri)
+ conf.set("spark.fileserver.uri", server.serverUri)
server
} else {
null
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index 2ec42d3aea..59ac82ccec 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -50,8 +50,8 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
private var jID: SerializableWritable[JobID] = null
private var taID: SerializableWritable[TaskAttemptID] = null
- @transient private var writer: RecordWriter[AnyRef,AnyRef] = null
- @transient private var format: OutputFormat[AnyRef,AnyRef] = null
+ @transient private var writer: RecordWriter[AnyRef, AnyRef] = null
+ @transient private var format: OutputFormat[AnyRef, AnyRef] = null
@transient private var committer: OutputCommitter = null
@transient private var jobContext: JobContext = null
@transient private var taskContext: TaskAttemptContext = null
@@ -114,10 +114,10 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
// ********* Private Functions *********
- private def getOutputFormat(): OutputFormat[AnyRef,AnyRef] = {
+ private def getOutputFormat(): OutputFormat[AnyRef, AnyRef] = {
if (format == null) {
format = conf.value.getOutputFormat()
- .asInstanceOf[OutputFormat[AnyRef,AnyRef]]
+ .asInstanceOf[OutputFormat[AnyRef, AnyRef]]
}
format
}
@@ -138,7 +138,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
private def getTaskContext(): TaskAttemptContext = {
if (taskContext == null) {
- taskContext = newTaskAttemptContext(conf.value, taID.value)
+ taskContext = newTaskAttemptContext(conf.value, taID.value)
}
taskContext
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 74db764322..b8e15f38a2 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -96,7 +96,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def mapPartitionsWithIndex[R](
f: JFunction2[jl.Integer, java.util.Iterator[T], java.util.Iterator[R]],
preservesPartitioning: Boolean = false): JavaRDD[R] =
- new JavaRDD(rdd.mapPartitionsWithIndex(((a,b) => f(a,asJavaIterator(b))),
+ new JavaRDD(rdd.mapPartitionsWithIndex(((a, b) => f(a, asJavaIterator(b))),
preservesPartitioning)(fakeClassTag))(fakeClassTag)
/**
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 2d92f6a42b..a77bf42ce1 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -723,7 +723,7 @@ private[spark] object PythonRDD extends Logging {
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new JavaToWritableConverter)
val fc = Utils.classForName(outputFormatClass).asInstanceOf[Class[F]]
- converted.saveAsHadoopFile(path, kc, vc, fc, new JobConf(mergedConf), codec=codec)
+ converted.saveAsHadoopFile(path, kc, vc, fc, new JobConf(mergedConf), codec = codec)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
index 0075d96371..026a1b9380 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
@@ -124,7 +124,7 @@ private[r] class RBackendHandler(server: RBackend)
}
throw new Exception(s"No matched method found for $cls.$methodName")
}
- val ret = methods.head.invoke(obj, args:_*)
+ val ret = methods.head.invoke(obj, args : _*)
// Write status bit
writeInt(dos, 0)
@@ -135,7 +135,7 @@ private[r] class RBackendHandler(server: RBackend)
matchMethod(numArgs, args, x.getParameterTypes)
}.head
- val obj = ctor.newInstance(args:_*)
+ val obj = ctor.newInstance(args : _*)
writeInt(dos, 0)
writeObject(dos, obj.asInstanceOf[AnyRef])
diff --git a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
index 06247f7e8b..e020458888 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
@@ -309,7 +309,7 @@ private class StringRRDD[T: ClassTag](
}
private object SpecialLengths {
- val TIMING_DATA = -1
+ val TIMING_DATA = -1
}
private[r] class BufferedStreamThread(
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 4457c75e8b..b69af639f7 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -125,7 +125,7 @@ private[broadcast] object HttpBroadcast extends Logging {
securityManager = securityMgr
if (isDriver) {
createServer(conf)
- conf.set("spark.httpBroadcast.uri", serverUri)
+ conf.set("spark.httpBroadcast.uri", serverUri)
}
serverUri = conf.get("spark.httpBroadcast.uri")
cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup, conf)
@@ -187,7 +187,7 @@ private[broadcast] object HttpBroadcast extends Logging {
}
private def read[T: ClassTag](id: Long): T = {
- logDebug("broadcast read server: " + serverUri + " id: broadcast-" + id)
+ logDebug("broadcast read server: " + serverUri + " id: broadcast-" + id)
val url = serverUri + "/" + BroadcastBlockId(id).name
var uc: URLConnection = null
diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
index c048b78910..b4edb6109e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -65,7 +65,7 @@ private object FaultToleranceTest extends App with Logging {
private val workers = ListBuffer[TestWorkerInfo]()
private var sc: SparkContext = _
- private val zk = SparkCuratorUtil.newClient(conf)
+ private val zk = SparkCuratorUtil.newClient(conf)
private var numPassed = 0
private var numFailed = 0
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 198371b70f..92bb5059a0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -361,7 +361,7 @@ object SparkSubmit {
pyArchives = pythonPath.mkString(",")
}
- pyArchives = pyArchives.split(",").map { localPath=>
+ pyArchives = pyArchives.split(",").map { localPath =>
val localURI = Utils.resolveURI(localPath)
if (localURI.getScheme != "local") {
args.files = mergeFileLists(args.files, localURI.toString)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index c8df024dda..ebc6cd76c6 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -554,7 +554,7 @@ private[deploy] object Worker extends Logging {
conf = conf, securityManager = securityMgr)
val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem)))
actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
- masterAkkaUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName)
+ masterAkkaUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName)
(actorSystem, boundPort)
}
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 06152f16ae..d90ae405a0 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -261,7 +261,7 @@ case class InputMetrics(readMethod: DataReadMethod.Value) {
*/
private var _recordsRead: Long = _
def recordsRead: Long = _recordsRead
- def incRecordsRead(records: Long): Unit = _recordsRead += records
+ def incRecordsRead(records: Long): Unit = _recordsRead += records
/**
* Invoke the bytesReadCallback and mutate bytesRead.
diff --git a/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala b/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala
index cfd20392d1..390d148bc9 100644
--- a/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala
+++ b/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala
@@ -60,7 +60,7 @@ trait SparkHadoopMapReduceUtil {
val taskTypeClass = Class.forName("org.apache.hadoop.mapreduce.TaskType")
.asInstanceOf[Class[Enum[_]]]
val taskType = taskTypeClass.getMethod("valueOf", classOf[String]).invoke(
- taskTypeClass, if(isMap) "MAP" else "REDUCE")
+ taskTypeClass, if (isMap) "MAP" else "REDUCE")
val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], taskTypeClass,
classOf[Int], classOf[Int])
ctor.newInstance(jtIdentifier, new JInteger(jobId), taskType, new JInteger(taskId),
diff --git a/core/src/main/scala/org/apache/spark/network/nio/BlockMessage.scala b/core/src/main/scala/org/apache/spark/network/nio/BlockMessage.scala
index b573f1a8a5..1a92a799d0 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/BlockMessage.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/BlockMessage.scala
@@ -110,7 +110,7 @@ private[nio] class BlockMessage() {
def getType: Int = typ
def getId: BlockId = id
def getData: ByteBuffer = data
- def getLevel: StorageLevel = level
+ def getLevel: StorageLevel = level
def toBufferMessage: BufferMessage = {
val buffers = new ArrayBuffer[ByteBuffer]()
diff --git a/core/src/main/scala/org/apache/spark/network/nio/BlockMessageArray.scala b/core/src/main/scala/org/apache/spark/network/nio/BlockMessageArray.scala
index 1ba25aa74a..7d0806f0c2 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/BlockMessageArray.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/BlockMessageArray.scala
@@ -114,8 +114,8 @@ private[nio] object BlockMessageArray {
val blockMessages =
(0 until 10).map { i =>
if (i % 2 == 0) {
- val buffer = ByteBuffer.allocate(100)
- buffer.clear
+ val buffer = ByteBuffer.allocate(100)
+ buffer.clear()
BlockMessage.fromPutBlock(PutBlock(TestBlockId(i.toString), buffer,
StorageLevel.MEMORY_ONLY_SER))
} else {
diff --git a/core/src/main/scala/org/apache/spark/network/nio/SecurityMessage.scala b/core/src/main/scala/org/apache/spark/network/nio/SecurityMessage.scala
index 747a2088a7..232c552f98 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/SecurityMessage.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/SecurityMessage.scala
@@ -75,7 +75,7 @@ private[nio] class SecurityMessage extends Logging {
for (i <- 1 to idLength) {
idBuilder += buffer.getChar()
}
- connectionId = idBuilder.toString()
+ connectionId = idBuilder.toString()
val tokenLength = buffer.getInt()
token = new Array[Byte](tokenLength)
diff --git a/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala
index 3ef3cc219d..91b07ce3af 100644
--- a/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala
+++ b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala
@@ -32,12 +32,12 @@ import org.apache.spark.util.collection.OpenHashMap
* An ApproximateEvaluator for counts by key. Returns a map of key to confidence interval.
*/
private[spark] class GroupedCountEvaluator[T : ClassTag](totalOutputs: Int, confidence: Double)
- extends ApproximateEvaluator[OpenHashMap[T,Long], Map[T, BoundedDouble]] {
+ extends ApproximateEvaluator[OpenHashMap[T, Long], Map[T, BoundedDouble]] {
var outputsMerged = 0
- var sums = new OpenHashMap[T,Long]() // Sum of counts for each key
+ var sums = new OpenHashMap[T, Long]() // Sum of counts for each key
- override def merge(outputId: Int, taskResult: OpenHashMap[T,Long]) {
+ override def merge(outputId: Int, taskResult: OpenHashMap[T, Long]) {
outputsMerged += 1
taskResult.foreach { case (key, value) =>
sums.changeValue(key, value, _ + value)
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index 0d130dd4c7..a4715e3437 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -49,7 +49,7 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
if (fs.exists(cpath)) {
val dirContents = fs.listStatus(cpath).map(_.getPath)
val partitionFiles = dirContents.filter(_.getName.startsWith("part-")).map(_.toString).sorted
- val numPart = partitionFiles.length
+ val numPart = partitionFiles.length
if (numPart > 0 && (! partitionFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) ||
! partitionFiles(numPart-1).endsWith(CheckpointRDD.splitIdToFile(numPart-1)))) {
throw new SparkException("Invalid checkpoint directory: " + checkpointPath)
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index 0c1b02c07d..663eebb8e4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -310,11 +310,11 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack:
def throwBalls() {
if (noLocality) { // no preferredLocations in parent RDD, no randomization needed
if (maxPartitions > groupArr.size) { // just return prev.partitions
- for ((p,i) <- prev.partitions.zipWithIndex) {
+ for ((p, i) <- prev.partitions.zipWithIndex) {
groupArr(i).arr += p
}
} else { // no locality available, then simply split partitions based on positions in array
- for(i <- 0 until maxPartitions) {
+ for (i <- 0 until maxPartitions) {
val rangeStart = ((i.toLong * prev.partitions.length) / maxPartitions).toInt
val rangeEnd = (((i.toLong + 1) * prev.partitions.length) / maxPartitions).toInt
(rangeStart until rangeEnd).foreach{ j => groupArr(i).arr += prev.partitions(j) }
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 8653cdee1a..004899f27b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -467,7 +467,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKey[CompactBuffer[V]](
- createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine=false)
+ createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
@@ -1011,7 +1011,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
jobFormat.checkOutputSpecs(job)
}
- val writeShard = (context: TaskContext, iter: Iterator[(K,V)]) => {
+ val writeShard = (context: TaskContext, iter: Iterator[(K, V)]) => {
val config = wrappedConf.value
/* "reduce task" <split #> <attempt # = spark task #> */
val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
@@ -1027,7 +1027,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context)
- val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
+ val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K, V]]
require(writer != null, "Unable to obtain RecordWriter")
var recordsWritten = 0L
Utils.tryWithSafeFinally {
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index d772f03f76..5fcef255e1 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -454,7 +454,7 @@ abstract class RDD[T: ClassTag](
withReplacement: Boolean,
num: Int,
seed: Long = Utils.random.nextLong): Array[T] = {
- val numStDev = 10.0
+ val numStDev = 10.0
if (num < 0) {
throw new IllegalArgumentException("Negative number of elements requested")
@@ -1138,8 +1138,8 @@ abstract class RDD[T: ClassTag](
if (elementClassTag.runtimeClass.isArray) {
throw new SparkException("countByValueApprox() does not support arrays")
}
- val countPartition: (TaskContext, Iterator[T]) => OpenHashMap[T,Long] = { (ctx, iter) =>
- val map = new OpenHashMap[T,Long]
+ val countPartition: (TaskContext, Iterator[T]) => OpenHashMap[T, Long] = { (ctx, iter) =>
+ val map = new OpenHashMap[T, Long]
iter.foreach {
t => map.changeValue(t, 1L, _ + 1L)
}
@@ -1585,15 +1585,15 @@ abstract class RDD[T: ClassTag](
case 0 => Seq.empty
case 1 =>
val d = rdd.dependencies.head
- debugString(d.rdd, prefix, d.isInstanceOf[ShuffleDependency[_,_,_]], true)
+ debugString(d.rdd, prefix, d.isInstanceOf[ShuffleDependency[_, _, _]], true)
case _ =>
val frontDeps = rdd.dependencies.take(len - 1)
val frontDepStrings = frontDeps.flatMap(
- d => debugString(d.rdd, prefix, d.isInstanceOf[ShuffleDependency[_,_,_]]))
+ d => debugString(d.rdd, prefix, d.isInstanceOf[ShuffleDependency[_, _, _]]))
val lastDep = rdd.dependencies.last
val lastDepStrings =
- debugString(lastDep.rdd, prefix, lastDep.isInstanceOf[ShuffleDependency[_,_,_]], true)
+ debugString(lastDep.rdd, prefix, lastDep.isInstanceOf[ShuffleDependency[_, _, _]], true)
(frontDepStrings ++ lastDepStrings)
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
index 3dfcf67f0e..4b5f15dd06 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
@@ -104,13 +104,13 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag
if (!convertKey && !convertValue) {
self.saveAsHadoopFile(path, keyWritableClass, valueWritableClass, format, jobConf, codec)
} else if (!convertKey && convertValue) {
- self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(
+ self.map(x => (x._1, anyToWritable(x._2))).saveAsHadoopFile(
path, keyWritableClass, valueWritableClass, format, jobConf, codec)
} else if (convertKey && !convertValue) {
- self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(
+ self.map(x => (anyToWritable(x._1), x._2)).saveAsHadoopFile(
path, keyWritableClass, valueWritableClass, format, jobConf, codec)
} else if (convertKey && convertValue) {
- self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(
+ self.map(x => (anyToWritable(x._1), anyToWritable(x._2))).saveAsHadoopFile(
path, keyWritableClass, valueWritableClass, format, jobConf, codec)
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
index 633aeba3bb..f7cb1791d4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
@@ -125,7 +125,7 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](
integrate(0, t => getSeq(t._1) += t._2)
// the second dep is rdd2; remove all of its keys
integrate(1, t => map.remove(t._1))
- map.iterator.map { t => t._2.iterator.map { (t._1, _) } }.flatten
+ map.iterator.map { t => t._2.iterator.map { (t._1, _) } }.flatten
}
override def clearDependencies() {
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
index a96b6c3d23..81f40ad33a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
@@ -123,7 +123,7 @@ private[spark] class ZippedPartitionsRDD3
}
private[spark] class ZippedPartitionsRDD4
- [A: ClassTag, B: ClassTag, C: ClassTag, D:ClassTag, V: ClassTag](
+ [A: ClassTag, B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag](
sc: SparkContext,
var f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V],
var rdd1: RDD[A],
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index a2299e907c..75a567fb31 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1367,10 +1367,10 @@ class DAGScheduler(
private def getPreferredLocsInternal(
rdd: RDD[_],
partition: Int,
- visited: HashSet[(RDD[_],Int)]): Seq[TaskLocation] = {
+ visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
// If the partition has already been visited, no need to re-visit.
// This avoids exponential path exploration. SPARK-695
- if (!visited.add((rdd,partition))) {
+ if (!visited.add((rdd, partition))) {
// Nil has already been returned for previously visited partitions.
return Nil
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
index 12668b6c09..02c67073af 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
@@ -17,9 +17,8 @@
package org.apache.spark.scheduler
-import com.codahale.metrics.{Gauge,MetricRegistry}
+import com.codahale.metrics.{Gauge, MetricRegistry}
-import org.apache.spark.SparkContext
import org.apache.spark.metrics.source.Source
private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala
index 5e62c8468f..864941d468 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala
@@ -56,7 +56,7 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
- var compare:Int = 0
+ var compare: Int = 0
if (s1Needy && !s2Needy) {
return true
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 863d0befbc..9620915f49 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -270,7 +270,7 @@ class StatsReportListener extends SparkListener with Logging {
private[spark] object StatsReportListener extends Logging {
// For profiling, the extremes are more interesting
- val percentiles = Array[Int](0,5,10,25,50,75,90,95,100)
+ val percentiles = Array[Int](0, 5, 10, 25, 50, 75, 90, 95, 100)
val probabilities = percentiles.map(_ / 100.0)
val percentilesHeader = "\t" + percentiles.mkString("%\t") + "%"
@@ -304,7 +304,7 @@ private[spark] object StatsReportListener extends Logging {
dOpt.foreach { d => showDistribution(heading, d, formatNumber)}
}
- def showDistribution(heading: String, dOpt: Option[Distribution], format:String) {
+ def showDistribution(heading: String, dOpt: Option[Distribution], format: String) {
def f(d: Double): String = format.format(d)
showDistribution(heading, dOpt, f _)
}
@@ -318,7 +318,7 @@ private[spark] object StatsReportListener extends Logging {
}
def showBytesDistribution(
- heading:String,
+ heading: String,
getMetric: (TaskInfo, TaskMetrics) => Option[Long],
taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)]) {
showBytesDistribution(heading, extractLongDistribution(taskInfoMetrics, getMetric))
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index c4487d5b37..d473e51aba 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -781,10 +781,10 @@ private[spark] class TaskSetManager(
// that it's okay if we add a task to the same queue twice (if it had multiple preferred
// locations), because dequeueTaskFromList will skip already-running tasks.
for (index <- getPendingTasksForExecutor(execId)) {
- addPendingTask(index, readding=true)
+ addPendingTask(index, readding = true)
}
for (index <- getPendingTasksForHost(host)) {
- addPendingTask(index, readding=true)
+ addPendingTask(index, readding = true)
}
// Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage,
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 70364cea62..4be1eda2e9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -75,7 +75,8 @@ private[spark] object CoarseGrainedClusterMessages {
case class SetupDriver(driver: RpcEndpointRef) extends CoarseGrainedClusterMessage
// Exchanged between the driver and the AM in Yarn client mode
- case class AddWebUIFilter(filterName:String, filterParams: Map[String, String], proxyBase: String)
+ case class AddWebUIFilter(
+ filterName: String, filterParams: Map[String, String], proxyBase: String)
extends CoarseGrainedClusterMessage
// Messages exchanged between the driver and the cluster manager for executor allocation
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 2a3a5d925d..190ff61d68 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -149,7 +149,7 @@ private[spark] abstract class YarnSchedulerBackend(
}
}
- override def onStop(): Unit ={
+ override def onStop(): Unit = {
askAmThreadPool.shutdownNow()
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index aff086594c..6b8edca5aa 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -52,7 +52,7 @@ private[spark] class CoarseMesosSchedulerBackend(
val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures
// Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
- val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
+ val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
// Cores we have acquired with each Mesos task ID
val coresByTaskId = new HashMap[Int, Int]
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index db0a080b3b..49de85ef48 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -146,7 +146,7 @@ private[spark] class MesosSchedulerBackend(
private def createExecArg(): Array[Byte] = {
if (execArgs == null) {
val props = new HashMap[String, String]
- for ((key,value) <- sc.conf.getAll) {
+ for ((key, value) <- sc.conf.getAll) {
props(key) = value
}
// Serialize the map as an array of (String, String) pairs
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
index 928c5cfed4..2f2934c249 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
@@ -108,7 +108,7 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
image: String,
volumes: Option[List[Volume]] = None,
network: Option[ContainerInfo.DockerInfo.Network] = None,
- portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None):Unit = {
+ portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None): Unit = {
val docker = ContainerInfo.DockerInfo.newBuilder().setImage(image)
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
index 50608588f0..390c136df7 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
@@ -169,7 +169,7 @@ private[v1] object AllStagesResource {
val outputMetrics: Option[OutputMetricDistributions] =
new MetricHelper[InternalOutputMetrics, OutputMetricDistributions](rawMetrics, quantiles) {
- def getSubmetrics(raw:InternalTaskMetrics): Option[InternalOutputMetrics] = {
+ def getSubmetrics(raw: InternalTaskMetrics): Option[InternalOutputMetrics] = {
raw.outputMetrics
}
def build: OutputMetricDistributions = new OutputMetricDistributions(
@@ -284,7 +284,7 @@ private[v1] object AllStagesResource {
* the options (returning None if the metrics are all empty), and extract the quantiles for each
* metric. After creating an instance, call metricOption to get the result type.
*/
-private[v1] abstract class MetricHelper[I,O](
+private[v1] abstract class MetricHelper[I, O](
rawMetrics: Seq[InternalTaskMetrics],
quantiles: Array[Double]) {
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala
index bf2cc2e72f..f73c742732 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala
@@ -101,7 +101,7 @@ private[v1] class ApiRootResource extends UIRootFromServletContext {
@Path("applications/{appId}/stages")
- def getStages(@PathParam("appId") appId: String): AllStagesResource= {
+ def getStages(@PathParam("appId") appId: String): AllStagesResource = {
uiRoot.withSparkUI(appId, None) { ui =>
new AllStagesResource(ui)
}
@@ -110,14 +110,14 @@ private[v1] class ApiRootResource extends UIRootFromServletContext {
@Path("applications/{appId}/{attemptId}/stages")
def getStages(
@PathParam("appId") appId: String,
- @PathParam("attemptId") attemptId: String): AllStagesResource= {
+ @PathParam("attemptId") attemptId: String): AllStagesResource = {
uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
new AllStagesResource(ui)
}
}
@Path("applications/{appId}/stages/{stageId: \\d+}")
- def getStage(@PathParam("appId") appId: String): OneStageResource= {
+ def getStage(@PathParam("appId") appId: String): OneStageResource = {
uiRoot.withSparkUI(appId, None) { ui =>
new OneStageResource(ui)
}
@@ -171,7 +171,7 @@ private[spark] object ApiRootResource {
def getServletHandler(uiRoot: UIRoot): ServletContextHandler = {
val jerseyContext = new ServletContextHandler(ServletContextHandler.NO_SESSIONS)
jerseyContext.setContextPath("/api")
- val holder:ServletHolder = new ServletHolder(classOf[ServletContainer])
+ val holder: ServletHolder = new ServletHolder(classOf[ServletContainer])
holder.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
"com.sun.jersey.api.core.PackagesResourceConfig")
holder.setInitParameter("com.sun.jersey.config.property.packages",
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala
index 07b224fac4..dfdc09c6ca 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala
@@ -25,7 +25,7 @@ import org.apache.spark.ui.SparkUI
private[v1] class OneRDDResource(ui: SparkUI) {
@GET
- def rddData(@PathParam("rddId") rddId: Int): RDDStorageInfo = {
+ def rddData(@PathParam("rddId") rddId: Int): RDDStorageInfo = {
AllRDDResource.getRDDStorageInfo(rddId, ui.storageListener, true).getOrElse(
throw new NotFoundException(s"no rdd found w/ id $rddId")
)
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index ef3c8570d8..2bec64f2ef 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -134,7 +134,7 @@ class StageData private[spark](
val accumulatorUpdates: Seq[AccumulableInfo],
val tasks: Option[Map[Long, TaskData]],
- val executorSummary:Option[Map[String,ExecutorStageSummary]])
+ val executorSummary: Option[Map[String, ExecutorStageSummary]])
class TaskData private[spark](
val taskId: Long,
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
index 543df4e135..7478ab0fc2 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
@@ -40,7 +40,7 @@ class BlockManagerSlaveEndpoint(
private implicit val asyncExecutionContext = ExecutionContext.fromExecutorService(asyncThreadPool)
// Operations that involve removing blocks may be slow and should be done asynchronously
- override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
+ override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RemoveBlock(blockId) =>
doAsync[Boolean]("removing block " + blockId, context) {
blockManager.removeBlock(blockId)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
index 8569c6f3cb..c5ba9af3e2 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
@@ -17,9 +17,8 @@
package org.apache.spark.storage
-import com.codahale.metrics.{Gauge,MetricRegistry}
+import com.codahale.metrics.{Gauge, MetricRegistry}
-import org.apache.spark.SparkContext
import org.apache.spark.metrics.source.Source
private[spark] class BlockManagerSource(val blockManager: BlockManager)
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index 0b11e914bb..3788916cf3 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -137,7 +137,7 @@ private[spark] object SparkUI {
jobProgressListener: JobProgressListener,
securityManager: SecurityManager,
appName: String,
- startTime: Long): SparkUI = {
+ startTime: Long): SparkUI = {
create(Some(sc), conf, listenerBus, securityManager, appName,
jobProgressListener = Some(jobProgressListener), startTime = startTime)
}
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 6194c50ec8..65162f4fdc 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -309,7 +309,7 @@ private[spark] object UIUtils extends Logging {
started: Int,
completed: Int,
failed: Int,
- skipped:Int,
+ skipped: Int,
total: Int): Seq[Node] = {
val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
val startWidth = "width: %s%%".format((started.toDouble/total)*100)
diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
index 5fbcd6bb8a..ba03acdb38 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
@@ -54,7 +54,7 @@ private[spark] object UIWorkloadGenerator {
val sc = new SparkContext(conf)
def setProperties(s: String): Unit = {
- if(schedulingMode == SchedulingMode.FAIR) {
+ if (schedulingMode == SchedulingMode.FAIR) {
sc.setLocalProperty("spark.scheduler.pool", s)
}
sc.setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, s)
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
index fbce917a08..36943978ff 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
@@ -33,7 +33,7 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
val parameterId = request.getParameter("id")
require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
val rddId = parameterId.toInt
- val rddStorageInfo = AllRDDResource.getRDDStorageInfo(rddId, listener,includeDetails = true)
+ val rddStorageInfo = AllRDDResource.getRDDStorageInfo(rddId, listener, includeDetails = true)
.getOrElse {
// Rather than crashing, render an "RDD Not Found" page
return UIUtils.headerSparkPage("RDD Not Found", Seq[Node](), parent)
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 7513b1b795..96aa2fe164 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -63,7 +63,7 @@ private[spark] object AkkaUtils extends Logging {
conf: SparkConf,
securityManager: SecurityManager): (ActorSystem, Int) = {
- val akkaThreads = conf.getInt("spark.akka.threads", 4)
+ val akkaThreads = conf.getInt("spark.akka.threads", 4)
val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15)
val akkaTimeoutS = conf.getTimeAsSeconds("spark.akka.timeout",
conf.get("spark.network.timeout", "120s"))
diff --git a/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala b/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala
index 9044aaeef2..31d230d0fe 100644
--- a/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala
+++ b/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala
@@ -42,7 +42,7 @@ abstract class CompletionIterator[ +A, +I <: Iterator[A]](sub: I) extends Iterat
private[spark] object CompletionIterator {
def apply[A, I <: Iterator[A]](sub: I, completionFunction: => Unit) : CompletionIterator[A, I] = {
- new CompletionIterator[A,I](sub) {
+ new CompletionIterator[A, I](sub) {
def completion(): Unit = completionFunction
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/Distribution.scala b/core/src/main/scala/org/apache/spark/util/Distribution.scala
index 9aea8efa38..1bab707235 100644
--- a/core/src/main/scala/org/apache/spark/util/Distribution.scala
+++ b/core/src/main/scala/org/apache/spark/util/Distribution.scala
@@ -35,7 +35,7 @@ private[spark] class Distribution(val data: Array[Double], val startIdx: Int, va
java.util.Arrays.sort(data, startIdx, endIdx)
val length = endIdx - startIdx
- val defaultProbabilities = Array(0,0.25,0.5,0.75,1.0)
+ val defaultProbabilities = Array(0, 0.25, 0.5, 0.75, 1.0)
/**
* Get the value of the distribution at the given probabilities. Probabilities should be
@@ -44,7 +44,7 @@ private[spark] class Distribution(val data: Array[Double], val startIdx: Int, va
*/
def getQuantiles(probabilities: Traversable[Double] = defaultProbabilities)
: IndexedSeq[Double] = {
- probabilities.toIndexedSeq.map{p:Double => data(closestIndex(p))}
+ probabilities.toIndexedSeq.map { p: Double => data(closestIndex(p)) }
}
private def closestIndex(p: Double) = {
diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
index 2bbfc988a9..a8bbad0868 100644
--- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
@@ -89,7 +89,7 @@ private[spark] object MetadataCleaner {
conf: SparkConf,
cleanerType: MetadataCleanerType.MetadataCleanerType,
delay: Int) {
- conf.set(MetadataCleanerType.systemProperty(cleanerType), delay.toString)
+ conf.set(MetadataCleanerType.systemProperty(cleanerType), delay.toString)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/util/MutablePair.scala b/core/src/main/scala/org/apache/spark/util/MutablePair.scala
index dad888548e..3d95b7869f 100644
--- a/core/src/main/scala/org/apache/spark/util/MutablePair.scala
+++ b/core/src/main/scala/org/apache/spark/util/MutablePair.scala
@@ -45,5 +45,5 @@ case class MutablePair[@specialized(Int, Long, Double, Char, Boolean/* , AnyRef
override def toString: String = "(" + _1 + "," + _2 + ")"
- override def canEqual(that: Any): Boolean = that.isInstanceOf[MutablePair[_,_]]
+ override def canEqual(that: Any): Boolean = that.isInstanceOf[MutablePair[_, _]]
}
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index f38949c3cb..f1f6b5e1f9 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -54,14 +54,14 @@ object SizeEstimator extends Logging {
def estimate(obj: AnyRef): Long = estimate(obj, new IdentityHashMap[AnyRef, AnyRef])
// Sizes of primitive types
- private val BYTE_SIZE = 1
+ private val BYTE_SIZE = 1
private val BOOLEAN_SIZE = 1
- private val CHAR_SIZE = 2
- private val SHORT_SIZE = 2
- private val INT_SIZE = 4
- private val LONG_SIZE = 8
- private val FLOAT_SIZE = 4
- private val DOUBLE_SIZE = 8
+ private val CHAR_SIZE = 2
+ private val SHORT_SIZE = 2
+ private val INT_SIZE = 4
+ private val LONG_SIZE = 8
+ private val FLOAT_SIZE = 4
+ private val DOUBLE_SIZE = 8
// Fields can be primitive types, sizes are: 1, 2, 4, 8. Or fields can be pointers. The size of
// a pointer is 4 or 8 depending on the JVM (32-bit or 64-bit) and UseCompressedOops flag.
@@ -96,7 +96,7 @@ object SizeEstimator extends Logging {
isCompressedOops = getIsCompressedOops
objectSize = if (!is64bit) 8 else {
- if(!isCompressedOops) {
+ if (!isCompressedOops) {
16
} else {
12
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index b7a2473dfe..763d4db690 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -882,7 +882,7 @@ private[spark] object Utils extends Logging {
// If not, we should change it to LRUCache or something.
private val hostPortParseResults = new ConcurrentHashMap[String, (String, Int)]()
- def parseHostPort(hostPort: String): (String, Int) = {
+ def parseHostPort(hostPort: String): (String, Int) = {
// Check cache first.
val cached = hostPortParseResults.get(hostPort)
if (cached != null) {
diff --git a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
index 41cb8cfe2a..9c15b1188d 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
@@ -161,7 +161,7 @@ class BitSet(numBits: Int) extends Serializable {
override def hasNext: Boolean = ind >= 0
override def next(): Int = {
val tmp = ind
- ind = nextSetBit(ind + 1)
+ ind = nextSetBit(ind + 1)
tmp
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala b/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala
index 4f0bf8384a..9a7a5a4e74 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala
@@ -90,9 +90,9 @@ class KVArraySortDataFormat[K, T <: AnyRef : ClassTag] extends SortDataFormat[K,
override def swap(data: Array[T], pos0: Int, pos1: Int) {
val tmpKey = data(2 * pos0)
val tmpVal = data(2 * pos0 + 1)
- data(2 * pos0) = data(2 * pos1)
+ data(2 * pos0) = data(2 * pos1)
data(2 * pos0 + 1) = data(2 * pos1 + 1)
- data(2 * pos1) = tmpKey
+ data(2 * pos1) = tmpKey
data(2 * pos1 + 1) = tmpVal
}
diff --git a/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala b/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala
index 9e29bf9d61..effe6fa2ad 100644
--- a/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala
@@ -196,7 +196,7 @@ private[spark] object StratifiedSamplingUtils extends Logging {
*
* The sampling function has a unique seed per partition.
*/
- def getBernoulliSamplingFunction[K, V](rdd: RDD[(K, V)],
+ def getBernoulliSamplingFunction[K, V](rdd: RDD[(K, V)],
fractions: Map[K, Double],
exact: Boolean,
seed: Long): (Int, Iterator[(K, V)]) => Iterator[(K, V)] = {