aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-01-12 21:31:04 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-01-12 21:31:04 -0800
commit0ab505a29e21b5a03928e0bbd3950f6f8e08ae32 (patch)
tree9c3e6038cc135a78b6d275493b795bb936ca979d /core
parent405bfe86ef9c3021358d2ac89192857478861fe0 (diff)
parent5a8abfb70efd89ec4120c7f78596d8b32a9f4f3d (diff)
downloadspark-0ab505a29e21b5a03928e0bbd3950f6f8e08ae32.tar.gz
spark-0ab505a29e21b5a03928e0bbd3950f6f8e08ae32.tar.bz2
spark-0ab505a29e21b5a03928e0bbd3950f6f8e08ae32.zip
Merge pull request #395 from hsaputra/remove_simpleredundantreturn_scala
Remove simple redundant return statements for Scala methods/functions Remove simple redundant return statements for Scala methods/functions: -) Only change simple return statements at the end of method -) Ignore the complex if-else check -) Ignore the ones inside synchronized -) Add small changes to making var to val if possible and remove () for simple get This hopefully makes the review simpler =) Pass compile and tests.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulators.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/CacheManager.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/HttpFileServer.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/Logging.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/Partitioner.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/network/BufferMessage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/network/Connection.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/network/Message.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Pool.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Stage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockMessage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/MemoryStore.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageLevel.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/util/SizeEstimator.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/util/Vector.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala14
43 files changed, 134 insertions, 134 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index 5f73d234aa..e89ac28b8e 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -218,7 +218,7 @@ private object Accumulators {
def newId: Long = synchronized {
lastId += 1
- return lastId
+ lastId
}
def register(a: Accumulable[_, _], original: Boolean): Unit = synchronized {
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 519ecde50a..8e5dd8a850 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -38,7 +38,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
blockManager.get(key) match {
case Some(values) =>
// Partition is already materialized, so just return its values
- return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
+ new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
case None =>
// Mark the split as loading (unless someone else marks it first)
@@ -74,7 +74,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
val elements = new ArrayBuffer[Any]
elements ++= computedValues
blockManager.put(key, elements, storageLevel, tellMaster = true)
- return elements.iterator.asInstanceOf[Iterator[T]]
+ elements.iterator.asInstanceOf[Iterator[T]]
} finally {
loading.synchronized {
loading.remove(key)
diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
index ad1ee20045..a885898ad4 100644
--- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
@@ -47,17 +47,17 @@ private[spark] class HttpFileServer extends Logging {
def addFile(file: File) : String = {
addFileToDir(file, fileDir)
- return serverUri + "/files/" + file.getName
+ serverUri + "/files/" + file.getName
}
def addJar(file: File) : String = {
addFileToDir(file, jarDir)
- return serverUri + "/jars/" + file.getName
+ serverUri + "/jars/" + file.getName
}
def addFileToDir(file: File, dir: File) : String = {
Files.copy(file, new File(dir, file.getName))
- return dir + "/" + file.getName
+ dir + "/" + file.getName
}
}
diff --git a/core/src/main/scala/org/apache/spark/Logging.scala b/core/src/main/scala/org/apache/spark/Logging.scala
index 4a34989e50..9063cae87e 100644
--- a/core/src/main/scala/org/apache/spark/Logging.scala
+++ b/core/src/main/scala/org/apache/spark/Logging.scala
@@ -41,7 +41,7 @@ trait Logging {
}
log_ = LoggerFactory.getLogger(className)
}
- return log_
+ log_
}
// Log methods that take only a String
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 77b8ca1cce..80a611b180 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -139,7 +139,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging {
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
}
}
- else{
+ else {
throw new FetchFailedException(null, shuffleId, -1, reduceId,
new Exception("Missing all output locations for shuffle " + shuffleId))
}
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 9b043f06dd..fc0a749882 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -53,9 +53,9 @@ object Partitioner {
return r.partitioner.get
}
if (rdd.context.conf.contains("spark.default.parallelism")) {
- return new HashPartitioner(rdd.context.defaultParallelism)
+ new HashPartitioner(rdd.context.defaultParallelism)
} else {
- return new HashPartitioner(bySize.head.partitions.size)
+ new HashPartitioner(bySize.head.partitions.size)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 9a3d36b51e..55ac76bf63 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -786,8 +786,11 @@ class SparkContext(
private[spark] def getCallSite(): String = {
val callSite = getLocalProperty("externalCallSite")
- if (callSite == null) return Utils.formatSparkCallSite
- callSite
+ if (callSite == null) {
+ Utils.formatSparkCallSite
+ } else {
+ callSite
+ }
}
/**
@@ -937,7 +940,7 @@ class SparkContext(
*/
private[spark] def clean[F <: AnyRef](f: F): F = {
ClosureCleaner.clean(f)
- return f
+ f
}
/**
@@ -949,7 +952,7 @@ class SparkContext(
val path = new Path(dir, UUID.randomUUID().toString)
val fs = path.getFileSystem(hadoopConfiguration)
fs.mkdirs(path)
- fs.getFileStatus(path).getPath().toString
+ fs.getFileStatus(path).getPath.toString
}
}
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index 618d95015f..4e63117a51 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -134,28 +134,28 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
format = conf.value.getOutputFormat()
.asInstanceOf[OutputFormat[AnyRef,AnyRef]]
}
- return format
+ format
}
private def getOutputCommitter(): OutputCommitter = {
if (committer == null) {
committer = conf.value.getOutputCommitter
}
- return committer
+ committer
}
private def getJobContext(): JobContext = {
if (jobContext == null) {
jobContext = newJobContext(conf.value, jID.value)
}
- return jobContext
+ jobContext
}
private def getTaskContext(): TaskAttemptContext = {
if (taskContext == null) {
taskContext = newTaskAttemptContext(conf.value, taID.value)
}
- return taskContext
+ taskContext
}
private def setIDs(jobid: Int, splitid: Int, attemptid: Int) {
@@ -182,19 +182,18 @@ object SparkHadoopWriter {
def createJobID(time: Date, id: Int): JobID = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
- return new JobID(jobtrackerID, id)
+ new JobID(jobtrackerID, id)
}
def createPathFromString(path: String, conf: JobConf): Path = {
if (path == null) {
throw new IllegalArgumentException("Output path is null")
}
- var outputPath = new Path(path)
+ val outputPath = new Path(path)
val fs = outputPath.getFileSystem(conf)
if (outputPath == null || fs == null) {
throw new IllegalArgumentException("Incorrectly formatted output path")
}
- outputPath = outputPath.makeQualified(fs)
- return outputPath
+ outputPath.makeQualified(fs)
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 40c519b5bd..82527fe663 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -95,7 +95,7 @@ private[spark] class PythonRDD[T: ClassTag](
// Return an iterator that read lines from the process's stdout
val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))
- return new Iterator[Array[Byte]] {
+ val stdoutIterator = new Iterator[Array[Byte]] {
def next(): Array[Byte] = {
val obj = _nextObj
if (hasNext) {
@@ -156,6 +156,7 @@ private[spark] class PythonRDD[T: ClassTag](
def hasNext = _nextObj.length != 0
}
+ stdoutIterator
}
val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index fdf92eca4f..1d295c62bc 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -203,16 +203,16 @@ extends Logging {
}
bais.close()
- var tInfo = TorrentInfo(retVal, blockNum, byteArray.length)
+ val tInfo = TorrentInfo(retVal, blockNum, byteArray.length)
tInfo.hasBlocks = blockNum
- return tInfo
+ tInfo
}
def unBlockifyObject[T](arrayOfBlocks: Array[TorrentBlock],
totalBytes: Int,
totalBlocks: Int): T = {
- var retByteArray = new Array[Byte](totalBytes)
+ val retByteArray = new Array[Byte](totalBytes)
for (i <- 0 until totalBlocks) {
System.arraycopy(arrayOfBlocks(i).byteArray, 0, retByteArray,
i * BLOCK_SIZE, arrayOfBlocks(i).byteArray.length)
diff --git a/core/src/main/scala/org/apache/spark/network/BufferMessage.scala b/core/src/main/scala/org/apache/spark/network/BufferMessage.scala
index f736bb3713..fb4c65909a 100644
--- a/core/src/main/scala/org/apache/spark/network/BufferMessage.scala
+++ b/core/src/main/scala/org/apache/spark/network/BufferMessage.scala
@@ -46,7 +46,7 @@ class BufferMessage(id_ : Int, val buffers: ArrayBuffer[ByteBuffer], var ackId:
throw new Exception("Max chunk size is " + maxChunkSize)
}
- if (size == 0 && gotChunkForSendingOnce == false) {
+ if (size == 0 && !gotChunkForSendingOnce) {
val newChunk = new MessageChunk(
new MessageChunkHeader(typ, id, 0, 0, ackId, senderAddress), null)
gotChunkForSendingOnce = true
diff --git a/core/src/main/scala/org/apache/spark/network/Connection.scala b/core/src/main/scala/org/apache/spark/network/Connection.scala
index 95cb0206ac..cba8477ed5 100644
--- a/core/src/main/scala/org/apache/spark/network/Connection.scala
+++ b/core/src/main/scala/org/apache/spark/network/Connection.scala
@@ -330,7 +330,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
// Is highly unlikely unless there was an unclean close of socket, etc
registerInterest()
logInfo("Connected to [" + address + "], " + outbox.messages.size + " messages pending")
- return true
+ true
} catch {
case e: Exception => {
logWarning("Error finishing connection to " + address, e)
@@ -385,7 +385,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
}
}
// should not happen - to keep scala compiler happy
- return true
+ true
}
// This is a hack to determine if remote socket was closed or not.
@@ -559,7 +559,7 @@ private[spark] class ReceivingConnection(channel_ : SocketChannel, selector_ : S
}
}
// should not happen - to keep scala compiler happy
- return true
+ true
}
def onReceive(callback: (Connection, Message) => Unit) {onReceiveCallback = callback}
diff --git a/core/src/main/scala/org/apache/spark/network/Message.scala b/core/src/main/scala/org/apache/spark/network/Message.scala
index f2ecc6d439..2612884bdb 100644
--- a/core/src/main/scala/org/apache/spark/network/Message.scala
+++ b/core/src/main/scala/org/apache/spark/network/Message.scala
@@ -61,7 +61,7 @@ private[spark] object Message {
if (dataBuffers.exists(_ == null)) {
throw new Exception("Attempting to create buffer message with null buffer")
}
- return new BufferMessage(getNewId(), new ArrayBuffer[ByteBuffer] ++= dataBuffers, ackId)
+ new BufferMessage(getNewId(), new ArrayBuffer[ByteBuffer] ++= dataBuffers, ackId)
}
def createBufferMessage(dataBuffers: Seq[ByteBuffer]): BufferMessage =
@@ -69,9 +69,9 @@ private[spark] object Message {
def createBufferMessage(dataBuffer: ByteBuffer, ackId: Int): BufferMessage = {
if (dataBuffer == null) {
- return createBufferMessage(Array(ByteBuffer.allocate(0)), ackId)
+ createBufferMessage(Array(ByteBuffer.allocate(0)), ackId)
} else {
- return createBufferMessage(Array(dataBuffer), ackId)
+ createBufferMessage(Array(dataBuffer), ackId)
}
}
diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
index 546d921067..44204a8c46 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
@@ -64,7 +64,7 @@ private[spark] object ShuffleSender {
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
val subDir = new File(localDirs(dirId), "%02x".format(subDirId))
val file = new File(subDir, blockId.name)
- return new FileSegment(file, 0, file.length())
+ new FileSegment(file, 0, file.length())
}
}
val sender = new ShuffleSender(port, pResovler)
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index 98da35763b..cefcc3d2d9 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -295,10 +295,10 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
val prefPartActual = prefPart.get
- if (minPowerOfTwo.size + slack <= prefPartActual.size) // more imbalance than the slack allows
- return minPowerOfTwo // prefer balance over locality
- else {
- return prefPartActual // prefer locality over balance
+ if (minPowerOfTwo.size + slack <= prefPartActual.size) { // more imbalance than the slack allows
+ minPowerOfTwo // prefer balance over locality
+ } else {
+ prefPartActual // prefer locality over balance
}
}
@@ -331,7 +331,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
*/
def run(): Array[PartitionGroup] = {
setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins)
- throwBalls() // assign partitions (balls) to each group (bins)
+ throwBalls() // assign partitions (balls) to each group (bins)
getPartitions
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 902083c24f..5cdb80be1d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -110,11 +110,11 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
val conf: Configuration = broadcastedConf.value.value
if (conf.isInstanceOf[JobConf]) {
// A user-broadcasted JobConf was provided to the HadoopRDD, so always use it.
- return conf.asInstanceOf[JobConf]
+ conf.asInstanceOf[JobConf]
} else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
// getJobConf() has been called previously, so there is already a local cache of the JobConf
// needed by this RDD.
- return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
+ HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
} else {
// Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
// local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
@@ -122,7 +122,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
val newJobConf = new JobConf(broadcastedConf.value.value)
initLocalJobConfFuncOpt.map(f => f(newJobConf))
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
- return newJobConf
+ newJobConf
}
}
@@ -138,7 +138,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
newInputFormat.asInstanceOf[Configurable].setConf(conf)
}
HadoopRDD.putCachedMetadata(inputFormatCacheKey, newInputFormat)
- return newInputFormat
+ newInputFormat
}
override def getPartitions: Array[Partition] = {
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index 1dbbe39898..d4f396afb5 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -96,7 +96,7 @@ class PipedRDD[T: ClassTag](
// Return an iterator that read lines from the process's stdout
val lines = Source.fromInputStream(proc.getInputStream).getLines
- return new Iterator[String] {
+ new Iterator[String] {
def next() = lines.next()
def hasNext = {
if (lines.hasNext) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index f9dc12eee3..edd4f381db 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -764,7 +764,7 @@ abstract class RDD[T: ClassTag](
val entry = iter.next()
m1.put(entry.getKey, m1.getLong(entry.getKey) + entry.getLongValue)
}
- return m1
+ m1
}
val myResult = mapPartitions(countPartition).reduce(mergeMaps)
myResult.asInstanceOf[java.util.Map[T, Long]] // Will be wrapped as a Scala mutable Map
@@ -842,7 +842,7 @@ abstract class RDD[T: ClassTag](
partsScanned += numPartsToTry
}
- return buf.toArray
+ buf.toArray
}
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
index 90eb8a747f..cc10cc0849 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
@@ -103,7 +103,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
retval ++= SplitInfo.toSplitInfo(inputFormatClazz, path, split)
}
- return retval.toSet
+ retval.toSet
}
// This method does not expect failures, since validate has already passed ...
@@ -121,18 +121,18 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
elem => retval ++= SplitInfo.toSplitInfo(inputFormatClazz, path, elem)
)
- return retval.toSet
+ retval.toSet
}
private def findPreferredLocations(): Set[SplitInfo] = {
logDebug("mapreduceInputFormat : " + mapreduceInputFormat + ", mapredInputFormat : " + mapredInputFormat +
", inputFormatClazz : " + inputFormatClazz)
if (mapreduceInputFormat) {
- return prefLocsFromMapreduceInputFormat()
+ prefLocsFromMapreduceInputFormat()
}
else {
assert(mapredInputFormat)
- return prefLocsFromMapredInputFormat()
+ prefLocsFromMapredInputFormat()
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
index 1791242215..4bc13c23d9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
@@ -75,12 +75,12 @@ private[spark] class Pool(
return schedulableNameToSchedulable(schedulableName)
}
for (schedulable <- schedulableQueue) {
- var sched = schedulable.getSchedulableByName(schedulableName)
+ val sched = schedulable.getSchedulableByName(schedulableName)
if (sched != null) {
return sched
}
}
- return null
+ null
}
override def executorLost(executorId: String, host: String) {
@@ -92,7 +92,7 @@ private[spark] class Pool(
for (schedulable <- schedulableQueue) {
shouldRevive |= schedulable.checkSpeculatableTasks()
}
- return shouldRevive
+ shouldRevive
}
override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
@@ -101,7 +101,7 @@ private[spark] class Pool(
for (schedulable <- sortedSchedulableQueue) {
sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue()
}
- return sortedTaskSetQueue
+ sortedTaskSetQueue
}
def increaseRunningTasks(taskNum: Int) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala
index 3418640b8c..5e62c8468f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala
@@ -37,9 +37,9 @@ private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
res = math.signum(stageId1 - stageId2)
}
if (res < 0) {
- return true
+ true
} else {
- return false
+ false
}
}
}
@@ -56,7 +56,6 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
- var res:Boolean = true
var compare:Int = 0
if (s1Needy && !s2Needy) {
@@ -70,11 +69,11 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
}
if (compare < 0) {
- return true
+ true
} else if (compare > 0) {
- return false
+ false
} else {
- return s1.name < s2.name
+ s1.name < s2.name
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index fc637884e2..17b1328b86 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -93,7 +93,7 @@ private[spark] class SparkListenerBus extends Logging {
* add overhead in the general case. */
Thread.sleep(10)
}
- return true
+ true
}
def stop(): Unit = post(SparkListenerShutdown)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 7cb3fe46e5..c60e9896de 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -96,7 +96,7 @@ private[spark] class Stage(
def newAttemptId(): Int = {
val id = nextAttemptId
nextAttemptId += 1
- return id
+ id
}
val name = callSite.getOrElse(rdd.origin)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
index e80cc6b0f6..9d3e615826 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
@@ -74,6 +74,6 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long
def value(): T = {
val resultSer = SparkEnv.get.serializer.newInstance()
- return resultSer.deserialize(valueBytes)
+ resultSer.deserialize(valueBytes)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index c52d6175d2..35e9544718 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -37,7 +37,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
protected val serializer = new ThreadLocal[SerializerInstance] {
override def initialValue(): SerializerInstance = {
- return sparkEnv.closureSerializer.newInstance()
+ sparkEnv.closureSerializer.newInstance()
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index a10e5397ad..fc0ee07089 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -228,7 +228,7 @@ private[spark] class TaskSetManager(
return Some(index)
}
}
- return None
+ None
}
/** Check whether a task is currently running an attempt on a given host */
@@ -291,7 +291,7 @@ private[spark] class TaskSetManager(
}
}
- return None
+ None
}
/**
@@ -332,7 +332,7 @@ private[spark] class TaskSetManager(
}
// Finally, if all else has failed, find a speculative task
- return findSpeculativeTask(execId, host, locality)
+ findSpeculativeTask(execId, host, locality)
}
/**
@@ -387,7 +387,7 @@ private[spark] class TaskSetManager(
case _ =>
}
}
- return None
+ None
}
/**
@@ -584,7 +584,7 @@ private[spark] class TaskSetManager(
}
override def getSchedulableByName(name: String): Schedulable = {
- return null
+ null
}
override def addSchedulable(schedulable: Schedulable) {}
@@ -594,7 +594,7 @@ private[spark] class TaskSetManager(
override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
var sortedTaskSetQueue = ArrayBuffer[TaskSetManager](this)
sortedTaskSetQueue += this
- return sortedTaskSetQueue
+ sortedTaskSetQueue
}
/** Called by TaskScheduler when an executor is lost so we can re-enqueue our tasks */
@@ -669,7 +669,7 @@ private[spark] class TaskSetManager(
}
}
}
- return foundTasks
+ foundTasks
}
private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index e16d60c54c..c27049bdb5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -140,7 +140,7 @@ private[spark] class CoarseMesosSchedulerBackend(
.format(basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
- return command.build()
+ command.build()
}
override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index b428c82a48..49781485d9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -141,13 +141,13 @@ private[spark] class MesosSchedulerBackend(
// Serialize the map as an array of (String, String) pairs
execArgs = Utils.serialize(props.toArray)
}
- return execArgs
+ execArgs
}
private def setClassLoader(): ClassLoader = {
val oldClassLoader = Thread.currentThread.getContextClassLoader
Thread.currentThread.setContextClassLoader(classLoader)
- return oldClassLoader
+ oldClassLoader
}
private def restoreClassLoader(oldClassLoader: ClassLoader) {
@@ -255,7 +255,7 @@ private[spark] class MesosSchedulerBackend(
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(1).build())
.build()
- return MesosTaskInfo.newBuilder()
+ MesosTaskInfo.newBuilder()
.setTaskId(taskId)
.setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
.setExecutor(createExecutorInfo(slaveId))
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index ff9f241fc1..e1fc0c707f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -412,7 +412,7 @@ private[spark] class BlockManager(
logDebug("The value of block " + blockId + " is null")
}
logDebug("Block " + blockId + " not found")
- return None
+ None
}
/**
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
index 21f003609b..42f52d7b26 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
@@ -42,15 +42,15 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends
val blockMessages = BlockMessageArray.fromBufferMessage(bufferMessage)
logDebug("Parsed as a block message array")
val responseMessages = blockMessages.map(processBlockMessage).filter(_ != None).map(_.get)
- return Some(new BlockMessageArray(responseMessages).toBufferMessage)
+ Some(new BlockMessageArray(responseMessages).toBufferMessage)
} catch {
case e: Exception => logError("Exception handling buffer message", e)
- return None
+ None
}
}
case otherMessage: Any => {
logError("Unknown type message received: " + otherMessage)
- return None
+ None
}
}
}
@@ -61,7 +61,7 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends
val pB = PutBlock(blockMessage.getId, blockMessage.getData, blockMessage.getLevel)
logDebug("Received [" + pB + "]")
putBlock(pB.id, pB.data, pB.level)
- return None
+ None
}
case BlockMessage.TYPE_GET_BLOCK => {
val gB = new GetBlock(blockMessage.getId)
@@ -70,9 +70,9 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends
if (buffer == null) {
return None
}
- return Some(BlockMessage.fromGotBlock(GotBlock(gB.id, buffer)))
+ Some(BlockMessage.fromGotBlock(GotBlock(gB.id, buffer)))
}
- case _ => return None
+ case _ => None
}
}
@@ -93,7 +93,7 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends
}
logDebug("GetBlock " + id + " used " + Utils.getUsedTimeMs(startTimeMs)
+ " and got buffer " + buffer)
- return buffer
+ buffer
}
}
@@ -111,7 +111,7 @@ private[spark] object BlockManagerWorker extends Logging {
val blockMessageArray = new BlockMessageArray(blockMessage)
val resultMessage = connectionManager.sendMessageReliablySync(
toConnManagerId, blockMessageArray.toBufferMessage)
- return (resultMessage != None)
+ resultMessage != None
}
def syncGetBlock(msg: GetBlock, toConnManagerId: ConnectionManagerId): ByteBuffer = {
@@ -130,8 +130,8 @@ private[spark] object BlockManagerWorker extends Logging {
return blockMessage.getData
})
}
- case None => logDebug("No response message received"); return null
+ case None => logDebug("No response message received")
}
- return null
+ null
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala b/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala
index 80dcb5a207..fbafcf79d2 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala
@@ -154,7 +154,7 @@ private[spark] class BlockMessage() {
println()
*/
val finishTime = System.currentTimeMillis
- return Message.createBufferMessage(buffers)
+ Message.createBufferMessage(buffers)
}
override def toString: String = {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
index a06f50a0ac..59329361f3 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
@@ -96,7 +96,7 @@ class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockM
println()
println()
*/
- return Message.createBufferMessage(buffers)
+ Message.createBufferMessage(buffers)
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 05f676c6e2..27f057b9f2 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -245,7 +245,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
return false
}
}
- return true
+ true
}
override def contains(blockId: BlockId): Boolean = {
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index b5596dffd3..0f84810d6b 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -74,7 +74,7 @@ class StorageLevel private(
if (deserialized_) {
ret |= 1
}
- return ret
+ ret
}
override def writeExternal(out: ObjectOutput) {
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index 7108595e3e..1df6b87fb0 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -61,7 +61,7 @@ private[spark] object ClosureCleaner extends Logging {
return f.getType :: Nil // Stop at the first $outer that is not a closure
}
}
- return Nil
+ Nil
}
// Get a list of the outer objects for a given closure object.
@@ -74,7 +74,7 @@ private[spark] object ClosureCleaner extends Logging {
return f.get(obj) :: Nil // Stop at the first $outer that is not a closure
}
}
- return Nil
+ Nil
}
private def getInnerClasses(obj: AnyRef): List[Class[_]] = {
@@ -174,7 +174,7 @@ private[spark] object ClosureCleaner extends Logging {
field.setAccessible(true)
field.set(obj, outer)
}
- return obj
+ obj
}
}
}
@@ -182,7 +182,7 @@ private[spark] object ClosureCleaner extends Logging {
private[spark] class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor(ASM4) {
override def visitMethod(access: Int, name: String, desc: String,
sig: String, exceptions: Array[String]): MethodVisitor = {
- return new MethodVisitor(ASM4) {
+ new MethodVisitor(ASM4) {
override def visitFieldInsn(op: Int, owner: String, name: String, desc: String) {
if (op == GETFIELD) {
for (cl <- output.keys if cl.getName == owner.replace('/', '.')) {
@@ -215,7 +215,7 @@ private[spark] class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisi
override def visitMethod(access: Int, name: String, desc: String,
sig: String, exceptions: Array[String]): MethodVisitor = {
- return new MethodVisitor(ASM4) {
+ new MethodVisitor(ASM4) {
override def visitMethodInsn(op: Int, owner: String, name: String,
desc: String) {
val argTypes = Type.getArgumentTypes(desc)
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index bddb3bb735..3cf94892e9 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -108,7 +108,7 @@ private[spark] object SizeEstimator extends Logging {
val bean = ManagementFactory.newPlatformMXBeanProxy(server,
hotSpotMBeanName, hotSpotMBeanClass)
// TODO: We could use reflection on the VMOption returned ?
- return getVMMethod.invoke(bean, "UseCompressedOops").toString.contains("true")
+ getVMMethod.invoke(bean, "UseCompressedOops").toString.contains("true")
} catch {
case e: Exception => {
// Guess whether they've enabled UseCompressedOops based on whether maxMemory < 32 GB
@@ -141,7 +141,7 @@ private[spark] object SizeEstimator extends Logging {
def dequeue(): AnyRef = {
val elem = stack.last
stack.trimEnd(1)
- return elem
+ elem
}
}
@@ -162,7 +162,7 @@ private[spark] object SizeEstimator extends Logging {
while (!state.isFinished) {
visitSingleObject(state.dequeue(), state)
}
- return state.size
+ state.size
}
private def visitSingleObject(obj: AnyRef, state: SearchState) {
@@ -276,11 +276,11 @@ private[spark] object SizeEstimator extends Logging {
// Create and cache a new ClassInfo
val newInfo = new ClassInfo(shellSize, pointerFields)
classInfos.put(cls, newInfo)
- return newInfo
+ newInfo
}
private def alignSize(size: Long): Long = {
val rem = size % ALIGN_SIZE
- return if (rem == 0) size else (size + ALIGN_SIZE - rem)
+ if (rem == 0) size else (size + ALIGN_SIZE - rem)
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 23b72701c2..a98adf9835 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -73,14 +73,14 @@ private[spark] object Utils extends Logging {
val oos = new ObjectOutputStream(bos)
oos.writeObject(o)
oos.close()
- return bos.toByteArray
+ bos.toByteArray
}
/** Deserialize an object using Java serialization */
def deserialize[T](bytes: Array[Byte]): T = {
val bis = new ByteArrayInputStream(bytes)
val ois = new ObjectInputStream(bis)
- return ois.readObject.asInstanceOf[T]
+ ois.readObject.asInstanceOf[T]
}
/** Deserialize an object using Java serialization and the given ClassLoader */
@@ -90,7 +90,7 @@ private[spark] object Utils extends Logging {
override def resolveClass(desc: ObjectStreamClass) =
Class.forName(desc.getName, false, loader)
}
- return ois.readObject.asInstanceOf[T]
+ ois.readObject.asInstanceOf[T]
}
/** Deserialize a Long value (used for {@link org.apache.spark.api.python.PythonPartitioner}) */
@@ -168,7 +168,7 @@ private[spark] object Utils extends Logging {
i += 1
}
}
- return buf
+ buf
}
private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]()
@@ -452,7 +452,7 @@ private[spark] object Utils extends Logging {
def parseHostPort(hostPort: String): (String, Int) = {
{
// Check cache first.
- var cached = hostPortParseResults.get(hostPort)
+ val cached = hostPortParseResults.get(hostPort)
if (cached != null) return cached
}
@@ -755,7 +755,7 @@ private[spark] object Utils extends Logging {
} catch {
case ise: IllegalStateException => return true
}
- return false
+ false
}
def isSpace(c: Char): Boolean = {
@@ -772,7 +772,7 @@ private[spark] object Utils extends Logging {
var inWord = false
var inSingleQuote = false
var inDoubleQuote = false
- var curWord = new StringBuilder
+ val curWord = new StringBuilder
def endWord() {
buf += curWord.toString
curWord.clear()
@@ -818,7 +818,7 @@ private[spark] object Utils extends Logging {
if (inWord || inDoubleQuote || inSingleQuote) {
endWord()
}
- return buf
+ buf
}
/* Calculates 'x' modulo 'mod', takes to consideration sign of x,
@@ -846,8 +846,7 @@ private[spark] object Utils extends Logging {
/** Returns a copy of the system properties that is thread-safe to iterator over. */
def getSystemProperties(): Map[String, String] = {
- return System.getProperties().clone()
- .asInstanceOf[java.util.Properties].toMap[String, String]
+ System.getProperties.clone().asInstanceOf[java.util.Properties].toMap[String, String]
}
/**
diff --git a/core/src/main/scala/org/apache/spark/util/Vector.scala b/core/src/main/scala/org/apache/spark/util/Vector.scala
index 62fd6d8da5..fcdf848637 100644
--- a/core/src/main/scala/org/apache/spark/util/Vector.scala
+++ b/core/src/main/scala/org/apache/spark/util/Vector.scala
@@ -27,7 +27,7 @@ class Vector(val elements: Array[Double]) extends Serializable {
def + (other: Vector): Vector = {
if (length != other.length)
throw new IllegalArgumentException("Vectors of different length")
- return Vector(length, i => this(i) + other(i))
+ Vector(length, i => this(i) + other(i))
}
def add(other: Vector) = this + other
@@ -35,7 +35,7 @@ class Vector(val elements: Array[Double]) extends Serializable {
def - (other: Vector): Vector = {
if (length != other.length)
throw new IllegalArgumentException("Vectors of different length")
- return Vector(length, i => this(i) - other(i))
+ Vector(length, i => this(i) - other(i))
}
def subtract(other: Vector) = this - other
@@ -49,7 +49,7 @@ class Vector(val elements: Array[Double]) extends Serializable {
ans += this(i) * other(i)
i += 1
}
- return ans
+ ans
}
/**
@@ -69,7 +69,7 @@ class Vector(val elements: Array[Double]) extends Serializable {
ans += (this(i) + plus(i)) * other(i)
i += 1
}
- return ans
+ ans
}
def += (other: Vector): Vector = {
@@ -104,7 +104,7 @@ class Vector(val elements: Array[Double]) extends Serializable {
ans += (this(i) - other(i)) * (this(i) - other(i))
i += 1
}
- return ans
+ ans
}
def dist(other: Vector): Double = math.sqrt(squaredDist(other))
@@ -119,7 +119,7 @@ object Vector {
def apply(length: Int, initializer: Int => Double): Vector = {
val elements: Array[Double] = Array.tabulate(length)(initializer)
- return new Vector(elements)
+ new Vector(elements)
}
def zeros(length: Int) = new Vector(new Array[Double](length))
diff --git a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
index d98c7aa3d7..b8c852b4ff 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
@@ -75,7 +75,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K,
i += 1
}
}
- return null.asInstanceOf[V]
+ null.asInstanceOf[V]
}
/** Set the value for a key */
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala
index 7bf2020fe3..235d31709a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala
@@ -64,7 +64,7 @@ class FakeTaskSetManager(
}
override def getSchedulableByName(name: String): Schedulable = {
- return null
+ null
}
override def executorLost(executorId: String, host: String): Unit = {
@@ -79,13 +79,14 @@ class FakeTaskSetManager(
{
if (tasksSuccessful + runningTasks < numTasks) {
increaseRunningTasks(1)
- return Some(new TaskDescription(0, execId, "task 0:0", 0, null))
+ Some(new TaskDescription(0, execId, "task 0:0", 0, null))
+ } else {
+ None
}
- return None
}
override def checkSpeculatableTasks(): Boolean = {
- return true
+ true
}
def taskFinished() {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 2aa259daf3..f0236ef1e9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -122,7 +122,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
locations: Seq[Seq[String]] = Nil
): MyRDD = {
val maxPartition = numPartitions - 1
- return new MyRDD(sc, dependencies) {
+ val newRDD = new MyRDD(sc, dependencies) {
override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
throw new RuntimeException("should not be reached")
override def getPartitions = (0 to maxPartition).map(i => new Partition {
@@ -135,6 +135,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
Nil
override def toString: String = "DAGSchedulerSuiteRDD " + id
}
+ newRDD
}
/**
diff --git a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
index 5cc48ee00a..29102913c7 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
@@ -42,12 +42,9 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
def buildJobDepTest(jobID: Int, stage: Stage) = buildJobDep(jobID, stage)
}
type MyRDD = RDD[(Int, Int)]
- def makeRdd(
- numPartitions: Int,
- dependencies: List[Dependency[_]]
- ): MyRDD = {
+ def makeRdd(numPartitions: Int, dependencies: List[Dependency[_]]): MyRDD = {
val maxPartition = numPartitions - 1
- return new MyRDD(sc, dependencies) {
+ new MyRDD(sc, dependencies) {
override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
throw new RuntimeException("should not be reached")
override def getPartitions = (0 to maxPartition).map(i => new Partition {
diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
index 0ed366fb70..de4871d043 100644
--- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
@@ -61,8 +61,8 @@ class NonSerializable {}
object TestObject {
def run(): Int = {
var nonSer = new NonSerializable
- var x = 5
- return withSpark(new SparkContext("local", "test")) { sc =>
+ val x = 5
+ withSpark(new SparkContext("local", "test")) { sc =>
val nums = sc.parallelize(Array(1, 2, 3, 4))
nums.map(_ + x).reduce(_ + _)
}
@@ -76,7 +76,7 @@ class TestClass extends Serializable {
def run(): Int = {
var nonSer = new NonSerializable
- return withSpark(new SparkContext("local", "test")) { sc =>
+ withSpark(new SparkContext("local", "test")) { sc =>
val nums = sc.parallelize(Array(1, 2, 3, 4))
nums.map(_ + getX).reduce(_ + _)
}
@@ -88,7 +88,7 @@ class TestClassWithoutDefaultConstructor(x: Int) extends Serializable {
def run(): Int = {
var nonSer = new NonSerializable
- return withSpark(new SparkContext("local", "test")) { sc =>
+ withSpark(new SparkContext("local", "test")) { sc =>
val nums = sc.parallelize(Array(1, 2, 3, 4))
nums.map(_ + getX).reduce(_ + _)
}
@@ -103,7 +103,7 @@ class TestClassWithoutFieldAccess {
def run(): Int = {
var nonSer2 = new NonSerializable
var x = 5
- return withSpark(new SparkContext("local", "test")) { sc =>
+ withSpark(new SparkContext("local", "test")) { sc =>
val nums = sc.parallelize(Array(1, 2, 3, 4))
nums.map(_ + x).reduce(_ + _)
}
@@ -115,7 +115,7 @@ object TestObjectWithNesting {
def run(): Int = {
var nonSer = new NonSerializable
var answer = 0
- return withSpark(new SparkContext("local", "test")) { sc =>
+ withSpark(new SparkContext("local", "test")) { sc =>
val nums = sc.parallelize(Array(1, 2, 3, 4))
var y = 1
for (i <- 1 to 4) {
@@ -134,7 +134,7 @@ class TestClassWithNesting(val y: Int) extends Serializable {
def run(): Int = {
var nonSer = new NonSerializable
var answer = 0
- return withSpark(new SparkContext("local", "test")) { sc =>
+ withSpark(new SparkContext("local", "test")) { sc =>
val nums = sc.parallelize(Array(1, 2, 3, 4))
for (i <- 1 to 4) {
var nonSer2 = new NonSerializable